containerd-v1.7.0
此篇正式开启插件启用流程分析。

源码分析

初始化入口

pkg/cri/cri.go:42

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Register CRI service plugin
func init() {
// 默认配置
config := criconfig.DefaultConfig()
// 必要信息注册
plugin.Register(&plugin.Registration{
// GRPC Plugin
Type: plugin.GRPCPlugin,
ID: "cri",
Config: &config,
// Requires 插件,对于顶层 `app.Run()` 中
Requires: []plugin.Type{
plugin.EventPlugin,
plugin.ServicePlugin,
plugin.NRIApiPlugin,
},
// 初始化函数
InitFn: initCRIService,
})
}

CRIService 初始化流程

pkg/cri/cri.go:57

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
...
// 上下文传递
ctx := ic.Context
// plugin 配置
pluginConfig := ic.Config.(*criconfig.PluginConfig)
// 校验 plugin 配置
if err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil {
return nil, fmt.Errorf("invalid plugin config: %w", err)
}
// 初始化 criconfig
c := criconfig.Config{
PluginConfig: *pluginConfig,
ContainerdRootDir: filepath.Dir(ic.Root),
ContainerdEndpoint: ic.Address,
RootDir: ic.Root,
StateDir: ic.State,
}
...
// 构造 contaninerd client
client, err := containerd.New(
"",
containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
containerd.WithDefaultPlatform(platforms.Default()),
// WithInMemoryServices适用于需要从另一个(内存)containerd插件(如CRI)使用containerd客户端的情况。
containerd.WithInMemoryServices(ic),
)
...
// 根据环境变量 ENABLE_CRI_SANDDBOXES 配置构造 CRIService
var s server.CRIService
if os.Getenv("ENABLE_CRI_SANDBOXES") != "" {
log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable")
s, err = sbserver.NewCRIService(c, client, getNRIAPI(ic))
} else {
log.G(ctx).Info("using legacy CRI server")
s, err = server.NewCRIService(c, client, getNRIAPI(ic))
}
...
// 启动协程运行 CRIService
go func() {
if err := s.Run(); err != nil {
log.G(ctx).WithError(err).Fatal("Failed to run CRI service")
}
// TODO(random-liu): Whether and how we can stop containerd.
}()
...
}

关于 NRI 的介绍详见 NRI:下一代节点细粒度资源控制方案

构造 CRIService 服务

criService 结构体定义

pkg/cri/server/service.go:71

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
type criService struct {
// cri 配置
config criconfig.Config
// 镜像文件系统路径
imageFSPath string
// 模拟操作系统级操作
os osinterface.OS
// sandboxes 相关资源
sandboxStore *sandboxstore.Store
// 存储所有 sandbox name 保证其唯一性
sandboxNameIndex *registrar.Registrar
// 存储 containers 相关资源
containerStore *containerstore.Store
// 存储所有 container name 保证其唯一性
containerNameIndex *registrar.Registrar
// 存储 images 相关资源
imageStore *imagestore.Store
// 存储所有 snapshots 信息
snapshotStore *snapshotstore.Store
// netPlugin 用于运行/停止 pod sandbox 时 配置/清除 网络
netPlugin map[string]cni.CNI
// client 为 containerd 客户端实例
client *containerd.Client
// streamServer 为处理 container streaming 请求的服务端
streamServer streaming.Server
// eventMonitor 为监控 containerd events 的监视器
eventMonitor *eventMonitor
// initialized 表明所有服务是否已经初始化了,在 server 被初始化之前,所有的 GRPC 服务必须返回 error
initialized atomic.Bool
// cniNetConfMonitor 用于重载 cni network 配置,当位于 network conf dir 中的配置文件发生可用变化时需要重载配置
cniNetConfMonitor map[string]*cniNetConfSyncer
// baseOCISpecs 包含通过 Runtime.BaseRuntimeSpec 缓存的 OCI specs
baseOCISpecs map[string]*oci.Spec
// allCaps 为 capabilities 列表,当为空时, 从 /proc/self/status 中解析获取
allCaps []string
// unpackDuplicationSuppressor 用于保证只有唯一一个 fetch request 或者 unpack handler 来处理
unpackDuplicationSuppressor kmutex.KeyedLocker
// nri 用于在处理 CRI 请求的时候回调 NRI
nri *nri.API
// containerEventsChan 用于捕获 container 事件,并将其发送到 GetContainerEvents 调用者
containerEventsChan chan runtime.ContainerEventResponse
}

NewCRIService 构造

pkg/cri/server/service.go:123

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.API) (CRIService, error) {
var err error
labels := label.NewStore()
c := &criService{
config: config,
client: client,
os: osinterface.RealOS{},
sandboxStore: sandboxstore.NewStore(labels),
containerStore: containerstore.NewStore(labels),
imageStore: imagestore.NewStore(client),
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
initialized: atomic.NewBool(false),
netPlugin: make(map[string]cni.CNI),
unpackDuplicationSuppressor: kmutex.New(),
}

// TODO: figure out a proper channel size.
c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000)

// SnapshotService 检查
if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {
return nil, fmt.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter)
}

// 镜像文件系统路径构造
c.imageFSPath = imageFSPath(config.ContainerdRootDir, config.ContainerdConfig.Snapshotter)
logrus.Infof("Get image filesystem path %q", c.imageFSPath)

// 冗余设计用于在 非 windows 和 linux 系统汇中初始化
if err := c.initPlatform(); err != nil {
return nil, fmt.Errorf("initialize platform: %w", err)
}

// 初始化 stream server
c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort, config.StreamIdleTimeout)
if err != nil {
return nil, fmt.Errorf("failed to create stream server: %w", err)
}

// 初始化 event monitor
c.eventMonitor = newEventMonitor(c)

// 初始化 cni net conf monitor
c.cniNetConfMonitor = make(map[string]*cniNetConfSyncer)
for name, i := range c.netPlugin {
path := c.config.NetworkPluginConfDir
if name != defaultNetworkPlugin {
if rc, ok := c.config.Runtimes[name]; ok {
path = rc.NetworkPluginConfDir
}
}
if path != "" {
m, err := newCNINetConfSyncer(path, i, c.cniLoadOptions())
if err != nil {
return nil, fmt.Errorf("failed to create cni conf monitor for %s: %w", name, err)
}
c.cniNetConfMonitor[name] = m
}
}

// 预加载 base OCI specs
c.baseOCISpecs, err = loadBaseOCISpecs(&config)
if err != nil {
return nil, err
}

// 加载 sandbox controllers(pod sandbox controller and remote shim controller)
c.sandboxControllers[criconfig.ModePodSandbox] = podsandbox.New(config, client, c.sandboxStore, c.os, c, c.baseOCISpecs)
c.sandboxControllers[criconfig.ModeShim] = client.SandboxController()

c.nri = nri

return c, nil
}

关于 SELinux-Label详解

启动 CRI Service

pkg/cri/server/service.go:207

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func (c *criService) Run() error {
logrus.Info("Start subscribing containerd event")
// 注册 event 事件订阅者
c.eventMonitor.subscribe(c.client)

logrus.Infof("Start recovering state")
// 通过 containerd 和 status checkpoint 恢复 system 状态
if err := c.recover(ctrdutil.NamespacedContext()); err != nil {
return fmt.Errorf("failed to recover state: %w", err)
}

// Start event handler.
logrus.Info("Start event monitor")
// 启动 eventMonitor
eventMonitorErrCh := c.eventMonitor.start()

// Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer")
// 构造 snapshotSyncer
snapshotsSyncer := newSnapshotsSyncer(
c.snapshotStore,
c.client.SnapshotService(c.config.ContainerdConfig.Snapshotter),
time.Duration(c.config.StatsCollectPeriod)*time.Second,
)
// 启动 snapshotsSyncer
snapshotsSyncer.start()

// 启动 CNI network conf syncers
cniNetConfMonitorErrCh := make(chan error, len(c.cniNetConfMonitor))
var netSyncGroup sync.WaitGroup
for name, h := range c.cniNetConfMonitor {
netSyncGroup.Add(1)
logrus.Infof("Start cni network conf syncer for %s", name)
go func(h *cniNetConfSyncer) {
cniNetConfMonitorErrCh <- h.syncLoop()
netSyncGroup.Done()
}(h)
}

if len(c.cniNetConfMonitor) > 0 {
go func() {
netSyncGroup.Wait()
close(cniNetConfMonitorErrCh)
}()
}

// 启动 streaming server.
logrus.Info("Start streaming server")
streamServerErrCh := make(chan error)
go func() {
defer close(streamServerErrCh)
if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed {
logrus.WithError(err).Error("Failed to start streaming server")
streamServerErrCh <- err
}
}()

// 在 NRI 中注册CRI domain
if err := c.nri.Register(&criImplementation{c}); err != nil {
return fmt.Errorf("failed to set up NRI for CRI service: %w", err)
}

// 设置 server 为 初始化状态. GRPC services 正式工作.
c.initialized.Set()

var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
select {
case eventMonitorErr = <-eventMonitorErrCh:
case streamServerErr = <-streamServerErrCh:
case cniNetConfMonitorErr = <-cniNetConfMonitorErrCh:
}
if err := c.Close(); err != nil {
return fmt.Errorf("failed to stop cri service: %w", err)
}

if err := <-eventMonitorErrCh; err != nil {
eventMonitorErr = err
}
logrus.Info("Event monitor stopped")
if err := <-streamServerErrCh; err != nil {
streamServerErr = err
}
logrus.Info("Stream server stopped")
if eventMonitorErr != nil {
return fmt.Errorf("event monitor error: %w", eventMonitorErr)
}
if streamServerErr != nil {
return fmt.Errorf("stream server error: %w", streamServerErr)
}
if cniNetConfMonitorErr != nil {
return fmt.Errorf("cni network conf monitor error: %w", cniNetConfMonitorErr)
}

return nil
}

流程架构图整理

CRI-work-flow