hhnf333 发表于 2018-5-28 10:57:04

docker1.12

  从原openstack转型至docker已有一段时间。更稳定的使用docker了解docker的各流程,从源代码层面了解下containerd。本文基于docker 1.12版本,从1.11开始docker已拆分docker daemon
containerd源码流程图

源码接口调用详情
A)第一步从ctr入口至API接口
checkpoint(用于快照,docker目前该功能不完善)
list --> /types.API/ListCheckpoint
create --> /types.API/CreateCheckpoint
delete --> /types.API/DeleteCheckpoint
containers
list、state -->/types.API/State
pause、resume、update -->/types.API/UpdateContainer
create --> /types.API/CreateContainer
stats --> /types.API/Stats
watch -->/types.API/State 、/types.API/Events
exec -->/types.API/Events 、/types.API/AddProcess 、/types.API/UpdateProcess
kill -->/types.API/Signal
start -->/types.API/Events 、 /types.API/CreateContainer 、/types.API/UpdateProcess
update -->/types.API/UpdateContainerevents
/types.API/Eventsstate
/types.API/Stateversion
/types.API/GetServerVersion --return resultB)第二步从API接口至supervisor任务处理
注:API--server.go --> daemon – supervisor.go(handleTask func)checkpoint
/types.API/ListCheckpoint (supervisor.GetContainersTask)--> getContainers
/types.API/CreateCheckpoint --> createCheckpoint
/types.API/DeleteCheckpoint --> deleteCheckpointcontainers
/types.API/State /types.API/Stats (supervisor.GetContainersTask)--> getContainers
/types.API/UpdateContainer (supervisor.UpdateTask)-->updateContainer
/types.API/CreateContainer (supervisor.StartTask)-->start
/types.API/Events --> Events --return result
/types.API/AddProcess -->addProcess
/types.API/UpdateProcess -->updateProcess
/types.API/Signal -->signal

C)第三步从任务队列至runtime至runc
checkpoint
getContainers -- return result
createCheckpoint -->(runtime)CheckPoint -->exec.Command(c.runtime,arg....)
deleteCheckpoint -->(runtime)DeleteCheckpoint -- return resultcontainers
getContainers -- return result
updateContainer -->(runtime)Resume Pause UpdateResources-->exec.Command(c.runtime,arg....)
start -->(runtime supervisor/worker.go) Start -->exec.Command(c.shim,c.id,c.bundle,c.runtime)
addProcess -->(runtime) exec --> exec.Command(c.shim,c.id,c.bundle,c.runtime)
updateProcess -->return result
signal -->return result  

  createContainer示例

deamon启动监听tasks及startTasks进程
  a)进入main.go main方法调用daemon方法
app.Action = func(context *cli.Context) {
       if err := daemon(context); err != nil {
            logrus.Fatal(err)
       }

}  b)进入main.go daemon方法
for i := 0; i < 10; i++ {
       wg.Add(1)
       w := supervisor.NewWorker(sv, wg)
       go w.Start()
}
if err := sv.Start(); err != nil {
       return err
}  c)初始化supervisor/worker.go NewWorker并启动监听startTask并处理
func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {
       return &worker{
            s:s,
            wg: wg,
       }
}

func (w *worker) Start() {
       defer w.wg.Done()
       for t := range w.s.startTasks {
            started := time.Now()
            process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
            if err != nil {
                     logrus.WithFields(logrus.Fields{
                            "error": err,
                            "id":    t.Container.ID(),
                     }).Error("containerd: start container")
                     t.Err <- err
                     evt := &DeleteTask{
                            ID:      t.Container.ID(),
                            NoEvent: true,
                            Process: process,
                     }
                     w.s.SendTask(evt)
                     continue
            }  d)启动supervisor/supervisor.go task监听task并处理
func (s *Supervisor) Start() error {
       logrus.WithFields(logrus.Fields{
            "stateDir":    s.stateDir,
            "runtime":   s.runtime,
            "runtimeArgs": s.runtimeArgs,
            "memory":      s.machine.Memory,
            "cpus":      s.machine.Cpus,
       }).Debug("containerd: supervisor running")
       go func() {
            for i := range s.tasks {
                     s.handleTask(i)
            }  
containers容器创建示例
Ctl控制台命令入口
  ctr/main.gocontainersCommand
execCommand,
killCommand,
listCommand,
pauseCommand,
resumeCommand,
startCommand,
stateCommand,
statsCommand,
watchCommand,
updateCommand,  ctr/container.go
var startCommand = cli.Command{
       Name:      "start",
       Usage:   "start a container",
       ArgsUsage: "ID BundlePath”, ————…...

      events, err := c.Events(netcontext.Background(), &types.EventsRequest{})/*事件创建*/
       if err != nil {
            fatal(err.Error(), 1)
       }
       if _, err := c.CreateContainer(netcontext.Background(), r); err != nil {/*容器创建*/
            fatal(err.Error(), 1)
       }
       if context.Bool("attach") {
            go func() {
                     io.Copy(stdin, os.Stdin)
                     if _, err := c.UpdateProcess(netcontext.Background(), &types.UpdateProcessRequest{/*更新进程*/
                            Id:         id,
                            Pid:      "init",
                            CloseStdin: true,
                     }); err != nil {
                            fatal(err.Error(), 1)
                     }
                     restoreAndCloseStdin()
            }()
            if tty {
                     resize(id, "init", c)
                     go func() {
                            s := make(chan os.Signal, 64)
                            signal.Notify(s, syscall.SIGWINCH)
                            for range s {
                                 if err := resize(id, "init", c); err != nil {
                                          log.Println(err)
                                 }
                            }
                     }()
            }
            waitForExit(c, events, id, "init", restoreAndCloseStdin)
       }
},  
api处理
  api/grpc/types/api.pb.go
func (c *aPIClient) Events(ctx context.Context, in *EventsRequest, opts ...grpc.CallOption) (API_EventsClient, error) {
       stream, err := grpc.NewClientStream(ctx, &_API_serviceDesc.Streams, c.cc, "/types.API/Events", opts...)
       if err != nil {
            return nil, err
       }
       x := &aPIEventsClient{stream}
       if err := x.ClientStream.SendMsg(in); err != nil {
            return nil, err
       }
       if err := x.ClientStream.CloseSend(); err != nil {
            return nil, err
       }
       return x, nil
}

func (c *aPIClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
       out := new(CreateContainerResponse)
       err := grpc.Invoke(ctx, "/types.API/CreateContainer", in, out, c.cc, opts...)
       if err != nil {
            return nil, err
       }
       return out, nil
}

func (c *aPIClient) UpdateProcess(ctx context.Context, in *UpdateProcessRequest, opts ...grpc.CallOption) (*UpdateProcessResponse, error) {
       out := new(UpdateProcessResponse)
       err := grpc.Invoke(ctx, "/types.API/UpdateProcess", in, out, c.cc, opts...)
       if err != nil {
            return nil, err
       }
       return out, nil
}  api/grpc/types/api.pb.go
func _API_Events_Handler(srv interface{}, stream grpc.ServerStream) error {
       m := new(EventsRequest)
       if err := stream.RecvMsg(m); err != nil {
            return err
       }
       return srv.(APIServer).Events(m, &aPIEventsServer{stream})
}

func _API_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
       in := new(CreateContainerRequest)
       if err := dec(in); err != nil {
            return nil, err
       }
       if interceptor == nil {
            return srv.(APIServer).CreateContainer(ctx, in)
       }
       info := &grpc.UnaryServerInfo{
            Server:   srv,
            FullMethod: "/types.API/CreateContainer",
       }
       handler := func(ctx context.Context, req interface{}) (interface{}, error) {
            return srv.(APIServer).CreateContainer(ctx, req.(*CreateContainerRequest))
       }
       return interceptor(ctx, in, info, handler)
}

func _API_UpdateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
       in := new(UpdateProcessRequest)
       if err := dec(in); err != nil {
            return nil, err
       }
       if interceptor == nil {
            return srv.(APIServer).UpdateProcess(ctx, in)
       }
       info := &grpc.UnaryServerInfo{
            Server:   srv,
            FullMethod: "/types.API/UpdateProcess",
       }
       handler := func(ctx context.Context, req interface{}) (interface{}, error) {
            return srv.(APIServer).UpdateProcess(ctx, req.(*UpdateProcessRequest))
       }
       return interceptor(ctx, in, info, handler)  api/grpc/server/server.go 进入第一步中的tasks及sendTasks处理队列
func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
events := s.sv.Events(t, r.StoredOnly, r.Id)

func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
s.sv.SendTask(e)
apiC, err := createAPIContainer(r.Container, false)

func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
       e := &supervisor.UpdateProcessTask{}
       e.ID = r.Id
       e.PID = r.Pid
       e.Height = int(r.Height)
       e.Width = int(r.Width)
       e.CloseStdin = r.CloseStdin
       s.sv.SendTask(e)
       if err := <-e.ErrorCh(); err != nil {
            return nil, err
       }
       return &types.UpdateProcessResponse{}, nil
}  supervisor/create.go
func (s *Supervisor) start(t *StartTask) error {
s.startTasks <- task  supervisor/worker.go
func (w *worker) Start() {
       defer w.wg.Done()
       for t := range w.s.startTasks {  runtime/container.go
func (c *container) Start(checkpointPath string, s Stdio) (Process, error) {
       processRoot := filepath.Join(c.root, c.id, InitProcessID)
       if err := os.Mkdir(processRoot, 0755); err != nil {
            return nil, err
       }
       cmd := exec.Command(c.shim,
            c.id, c.bundle, c.runtime,
       ) ---执行 docker-containerd-shim命令
       cmd.Dir = processRoot
       cmd.SysProcAttr = &syscall.SysProcAttr{
            Setpgid: true,
       }
       spec, err := c.readSpec()
       if err != nil {
            return nil, err
       }
       config := &processConfig{
            checkpoint:checkpointPath,
            root:      processRoot,
            id:          InitProcessID,
            c:         c,
            stdio:       s,
            spec:      spec,
            processSpec: specs.ProcessSpec(spec.Process),
       }
       p, err := newProcess(config)
       if err != nil {
            return nil, err
       }
       if err := c.createCmd(InitProcessID, cmd, p); err != nil {
            return nil, err
       }
       return p, nil
}  
  containerd-shim接收后处理
  containerd-shim/main.go
func start(log *os.File) error {

p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))
if err != nil {
       return err
}
defer func() {
       if err := p.Close(); err != nil {
            writeMessage(log, "warn", err)
       }
}()
if err := p.create(); err != nil {
       p.delete()
       return err
}  containerd-shim/process.go跳转执行runc命令
func (p *process) create() error {
cmd := exec.Command(p.runtime, args...)  
页: [1]
查看完整版本: docker1.12