对于分析 Controller 源码选用 StatefulSet Controller 来,其它控制器源码分析一个套路,可以做参考。

StatefulSet 简介

此篇文章默认你已经具备了熟练使用 Statefulset 的基础知识,所以常规介绍及使用 Demo 的描述不在阐述,具体可参考 StatefulSet 基础

StatefulSet Controller 启动分析

kube-manager-controller 入口调用链分析

对于 Kubernetes 的源码组织结构不做过多介绍,希望你有一定的了解。

对于 k8s 是如何启动 kube-controller-manager,可以通过文档kube-controller-manager 查找到对应如下内容:

1
2
3
4
--controllers strings     默认值:[*]
要启用的控制器列表。\* 表示启用所有默认启用的控制器; foo 启用名为 foo 的控制器; -foo 表示禁用名为 foo 的控制器。
控制器的全集:attachdetach、bootstrapsigner、cloud-node-lifecycle、clusterrole-aggregation、cronjob、csrapproving、csrcleaner、csrsigning、daemonset、deployment、disruption、endpoint、endpointslice、endpointslicemirroring、ephemeral-volume、garbagecollector、horizontalpodautoscaling、job、namespace、nodeipam、nodelifecycle、persistentvolume-binder、persistentvolume-expander、podgc、pv-protection、pvc-protection、replicaset、replicationcontroller、resourcequota、root-ca-cert-publisher、route、service、serviceaccount、serviceaccount-token、statefulset、tokencleaner、ttl、ttl-after-finished
默认禁用的控制器有:bootstrapsigner 和 tokencleaner。

这里我们发现默认值启动中已经加入了 statefulset 的初始化,那么在代码是在哪里体现的呢?继续往下看。

进入 cmd/controller-managermain 函数,实际上就做两个事情:

1
2
3
4
5
func main() {
command := app.NewControllerManagerCommand() // 初始化
code := cli.Run(command) //真正执行
os.Exit(code)
}

通过函数调用关系,我们进入 cmd/kube-controller-manager/controllermanager.go 中,查看 Run 执行究竟做了啥。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
...
Run: func(cmd *cobra.Command, args []string) {
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())

err := checkNonZeroInsecurePort(cmd.Flags())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

// 这里是我们需要关注的点,就是 `KnownControllers() 实际上就是将我们进行需要我们初始化已知的 `Controllers`
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

if err := Run(c.Complete(), wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
...

深入 KnowControllers() 函数分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// KnownControllers returns all known controllers's name
// 这里我们发现实际上这边就是之前 `daemon` 进程启动需要的参数,为一个 `controller` 控制器数组。
func KnownControllers() []string {
ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))

// add "special" controllers that aren't initialized normally. These controllers cannot be initialized
// using a normal function. The only known special case is the SA token controller which *must* be started
// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding
// to this list.
ret.Insert(
saTokenControllerName,
)

return ret.List()
}

通过 NewControllerInitializers 可以知道的是真正执行 controller-manager 初始化的执行函数是这个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}

// 此处即为在 `map` 中进行实质性的初始化赋值
controllers["statefulset"] = startStatefulSetController

if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
// TODO: volume controller into the IncludeCloudLoops only set.
}
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
controllers["ephemeral-volume"] = startEphemeralVolumeController
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
controllers["storage-version-gc"] = startStorageVersionGCController
}

return controllers
}

通过如上可以得知 StatefulSet 是如何被初始化到 kube-controller-manager 中的。

Statefulset Controller 启动过程

通过 cmd/kube-manager-controller/app/controllermanager.goRun 函数分析,其中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
...
Run: func(cmd *cobra.Command, args []string) {
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())

err := checkNonZeroInsecurePort(cmd.Flags())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

// 此处是对 `Controller manager` 进行 `config` 的初始化,然后对其中所管理的 `controller` 进行 `Run` 执行
if err := Run(c.Complete(), wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
...

进入 Run 函数进行分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Run runs the KubeControllerManagerOptions.  This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
...
clientBuilder, rootClientBuilder := createClientBuilders(c)

saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) { // 此处是我们需要关注的 `Controller` 初始化。

controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
controllerInitializers := initializersFunc(controllerContext.LoopMode)
// 在这里是真正意义上开始对管理的控制器执行启动
if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}

controllerContext.InformerFactory.Start(stopCh)
controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted)

select {}
}
...
select {}
}

查看 StartControllers 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// StartControllers starts a set of controllers with a specified ControllerContext
func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
...
// 这里会 `for` 循环遍历初始化过的 `controllers` 进行处理,需要关注下 `initFn` 究竟做了啥.
for controllerName, initFn := range controllers {
if !controllerCtx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName)
continue
}

time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

klog.V(1).Infof("Starting %q", controllerName)
ctrl, started, err := initFn(ctx, controllerCtx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
...
klog.Infof("Started %q", controllerName)
}

healthzHandler.AddHealthChecker(controllerChecks...)

return nil
}
1
2
// 用于判断具体的 `controller` 是否满足接口需求来得到 `controller manager` 支持的特性
type InitFunc func(ctx context.Context, controllerCtx ControllerContext) (controller controller.Interface, enabled bool, err error)

看到这里我们似乎还是没看到 StatefulSet Controller 真正执行的地方,请再次回顾下我们之前 NewControllerInitializers 中的内容:

1
2
3
4
5
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
...
controllers["statefulset"] = startStatefulSetController
...
}

感觉看到了一丝丝曙光,继续往下看 startStatefulSetController 的具体实现。

1
2
3
4
5
6
7
8
9
10
11
func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go statefulset.NewStatefulSetController(
// 如下是 `Sts` 直接相关类型
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Apps().V1().StatefulSets(),
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Done())
return nil, true, nil
}

看到这里我们终于知道了 StatefulSet Controller 真正意义上是被如何启动的了。

StatefulSet Controller 明细分析

通过如上的分析,下面就到了 StatefulSet Controller 具体的范畴了。

StatefulSetController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type StatefulSetController struct {
// client interface
kubeClient clientset.Interface
// control returns an interface capable of syncing a stateful set.
// Abstracted out for testing.
control StatefulSetControlInterface
// podControl is used for patching pods.
podControl controller.PodControlInterface
// podLister is able to list/get pods from a shared informer's store
podLister corelisters.PodLister
// podListerSynced returns true if the pod shared informer has synced at least once
podListerSynced cache.InformerSynced
// setLister is able to list/get stateful sets from a shared informer's store
setLister appslisters.StatefulSetLister
// setListerSynced returns true if the stateful set shared informer has synced at least once
setListerSynced cache.InformerSynced
// pvcListerSynced returns true if the pvc shared informer has synced at least once
pvcListerSynced cache.InformerSynced
// revListerSynced returns true if the rev shared informer has synced at least once
revListerSynced cache.InformerSynced
// StatefulSets that need to be synced.
queue workqueue.RateLimitingInterface
}

通过对 StatefulSetController 结构体的大纲,了解下大概的结构:
ssc

NewStatefulSetController

对于 ssc 的构造函数分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func NewStatefulSetController(
// 可以观察到,这边都是 `ssc` 关心的 `resource` 对象,Pod/Sts/Pvc/Revision
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
revInformer appsinformers.ControllerRevisionInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
ssc := &StatefulSetController{
kubeClient: kubeClient,
control: NewDefaultStatefulSetControl(
NewRealStatefulPodControl(
kubeClient,
setInformer.Lister(),
podInformer.Lister(),
pvcInformer.Lister(),
recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()),
recorder,
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},

revListerSynced: revInformer.Informer().HasSynced,
}

// `Sts` 管理的 `Pod crud` 时对应的处理方法
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// lookup the statefulset and enqueue
AddFunc: ssc.addPod,
// lookup current and old statefulset if labels changed
UpdateFunc: ssc.updatePod,
// lookup statefulset accounting for deletion tombstones
DeleteFunc: ssc.deletePod,
})
ssc.podLister = podInformer.Lister()
ssc.podListerSynced = podInformer.Informer().HasSynced

// `Sts crud` 时对应的方法
setInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ssc.enqueueStatefulSet,
UpdateFunc: func(old, cur interface{}) {
oldPS := old.(*apps.StatefulSet)
curPS := cur.(*apps.StatefulSet)
if oldPS.Status.Replicas != curPS.Status.Replicas {
klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
}
ssc.enqueueStatefulSet(cur)
},
DeleteFunc: ssc.enqueueStatefulSet,
},
)
ssc.setLister = setInformer.Lister()
ssc.setListerSynced = setInformer.Informer().HasSynced

// TODO: Watch volumes
return ssc
}

ControllerRevision

这里了解下 ControllerRevision 究竟是啥,为啥需要关注。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ControllerRevision implements an immutable snapshot of state data. Clients
// are responsible for serializing and deserializing the objects that contain
// their internal state.
// Once a ControllerRevision has been successfully created, it can not be updated.
// The API Server will fail validation of all requests that attempt to mutate
// the Data field. ControllerRevisions may, however, be deleted. Note that, due to its use by both
// the DaemonSet and StatefulSet controllers for update and rollback, this object is beta. However,
// it may be subject to name and representation changes in future releases, and clients should not
// depend on its stability. It is primarily for internal use by controllers.
type ControllerRevision struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// Data is the serialized representation of the state.
Data runtime.RawExtension `json:"data,omitempty" protobuf:"bytes,2,opt,name=data"`

// Revision indicates the revision of the state represented by Data.
Revision int64 `json:"revision" protobuf:"varint,3,opt,name=revision"`
}

由对应的注释可以知道:
ControllerRevision 提供给 DaemonSetStatefulSet 用作更新和回滚,ControllerRevision 存放的是数据的快照,ControllerRevision 生成之后内容是不可修改的,由调用端来负责序列化写入和反序列化读取。其中 Revision(int64) 字段相当于 ControllerRevision 的版本 id 号,Data字段则存放序列化后的数据。
所以 Sts 的更新以及回滚是基于新旧 ControllerRevision 的对比来进行的。

NewDefaultStatefulSetControl

深入看下 NewDefaultStatefulSetControl 定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
// implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
// and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
// to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
// scenario other than testing.
func NewDefaultStatefulSetControl(
// 管理 Sts 对应的 Pod 的接口
podControl StatefulPodControlInterface,
// 管理 Sts 的 Status 的更新接口
statusUpdater StatefulSetStatusUpdaterInterface,
// 管理 ControllerRevision 的接口
controllerHistory history.Interface,
// 事件记录器接口
recorder record.EventRecorder) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
}

Run 函数执行过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Run runs the statefulset controller.
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ssc.queue.ShutDown()

klog.Infof("Starting stateful set controller")
defer klog.Infof("Shutting down statefulset controller")

if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(ssc.worker, time.Second, stopCh)
}

<-stopCh
}

此处关注下 wait.Until 工具:

1
2
3
4
5
6
7
8
// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, true, stopCh)
}

通过注释可以知道, Until 工具会根据 channel 的关闭来周期性的执行函数 f
主要解决的是当我们执行完某些操作后,还需要等待其他资源执行的情况,例如对于有依赖条件的资源释放的时候,A 依赖于 B,那么对 A 资源释放的时候还需要对 B 资源的释放进行观望。这在 k8s 的资源操作场景是常见的。

继续关注 wait.Until 中包的函数 ssc.worker

1
2
3
4
5
// worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed
func (ssc *StatefulSetController) worker() {
for ssc.processNextWorkItem() {
}
}

worker 通过运行一个 goroutine 来处理 processNextWorkItem 直到 controller 相关的 queue 被关闭。

毫无疑问,需要分析 processNextWorkItem() 对应函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
// invoked concurrently with the same key.
func (ssc *StatefulSetController) processNextWorkItem() bool {
key, quit := ssc.queue.Get()
if quit {
return false
}
defer ssc.queue.Done(key)
// 其它语义很容易理解,需要关注的是 ssc.sync() 函数
if err := ssc.sync(key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("error syncing StatefulSet %v, requeuing: %v", key.(string), err))
ssc.queue.AddRateLimited(key)
} else {
ssc.queue.Forget(key)
}
return true
}

processNextWorkItem() 主要用于对 queue 的元素出队,并标记为已处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// sync syncs the given statefulset.
func (ssc *StatefulSetController) sync(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
}()

// 对缓存中的 key 进行 split操作
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

// 对缓存根据 namespace 及 name 进行 get 操作
set, err := ssc.setLister.StatefulSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.Infof("StatefulSet has been deleted %v", key)
return nil
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
return err
}

// 获取 sts 的 selector
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
// This is a non-transient error, so don't retry.
return nil
}

// 调用 ssc.adoptOrphanRevisions 检查是否有孤儿 controllerrevisions 对象,若有且能匹配 selector 的则添加 ownerReferences 进行关联,已关联但 label 不匹配的则进行释放。
if err := ssc.adoptOrphanRevisions(set); err != nil {
return err
}

// 调用 ssc.getPodsForStatefulSet 通过 selector 获取 sts 关联的 pod,若有孤儿 pod 的 label 与 sts 的能匹配则进行关联,若已关联的 pod label 有变化则解除与 sts 的关联关系。
pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
return err
}

// 执行真正的 sync 操作
return ssc.syncStatefulSet(set, pods)
}

syncStatefulSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
var status *apps.StatefulSetStatus
var err error
// TODO: investigate where we mutate the set during the update as it is not obvious.
// 中仅仅是调用了 ssc.control.UpdateStatefulSet 方法进行处理。
status, err = ssc.control.UpdateStatefulSet(set.DeepCopy(), pods)
if err != nil {
return err
}
klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
// One more sync to handle the clock skew. This is also helping in requeuing right after status update
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
}

return nil
}

UpdateStatefulSet

  1. 获取历史 revisions
  2. 计算 currentRevisionupdateRevision,若 sts 处于更新过程中则 currentRevisionupdateRevision 值不同;
  3. 调用 ssc.performUpdate 执行实际的 sync 操作;
  4. 调用 ssc.updateStatefulSetStatus 更新 status subResource
  5. 根据 stsspec.revisionHistoryLimit 字段清理过期的 controllerrevision
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
// 获取 revisions 并排序
revisions, err := ssc.ListRevisions(set)
if err != nil {
return nil, err
}
history.SortControllerRevisions(revisions)

// 计算 Revison
currentRevision, updateRevision, status, err := ssc.performUpdate(set, pods, revisions)
if err != nil {
return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
}

// 清除过期的历史版本
return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}

func (ssc *defaultStatefulSetControl) performUpdate(
set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
var currentStatus *apps.StatefulSetStatus
// get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return currentRevision, updateRevision, currentStatus, err
}

// 实现具体的 update 操作
currentStatus, err = ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
if err != nil {
return currentRevision, updateRevision, currentStatus, err
}
// update status
err = ssc.updateStatefulSetStatus(set, currentStatus)
if err != nil {
return currentRevision, updateRevision, currentStatus, err
}
klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
set.Namespace,
set.Name,
currentStatus.Replicas,
currentStatus.ReadyReplicas,
currentStatus.CurrentReplicas,
currentStatus.UpdatedReplicas)

klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
set.Namespace,
set.Name,
currentStatus.CurrentRevision,
currentStatus.UpdateRevision)

return currentRevision, updateRevision, currentStatus, nil
}

updateStatefulSet

// 作为updateStatefulSet的核心方法,重试保障Statefulset到达期望状态,update策略主要分为三类:
// 1.RollingUpdateStatefulSetStrategyType
// 2.OnDeleteStatefulSetStrategyType
// 3.PartitionStatefulSetStrategyType
func (ssc *defaultStatefulSetControl) updateStatefulSet(
    set *apps.StatefulSet,
    currentRevision *apps.ControllerRevision,
    updateRevision *apps.ControllerRevision,
    collisionCount int32,
    pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
    ...
    // 获取当前和更新的 Revision
    currentSet, err := ApplyRevision(set, currentRevision)
    ...
    // 构建 sts 对象
    currentSet, err := ApplyRevision(set, currentRevision)
    ...
    // 构建 sts 对象
    updateSet, err := ApplyRevision(set, updateRevision)
    ...
    // status 赋值
    status := apps.StatefulSetStatus{}
    status.ObservedGeneration = set.Generation
    status.CurrentRevision = currentRevision.Name
    status.UpdateRevision = updateRevision.Name
    status.CollisionCount = new(int32)
    *status.CollisionCount = collisionCount

    // replicas 存放  Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
    replicas := make([]*v1.Pod, replicaCount)
    // condemned 存放  Pods such that set.Spec.Replicas <= getOrdinal(pod)
    condemned := make([]*v1.Pod, 0, len(pods))
    ...
    // 对 pods 进行处理分别存放到 replicas  和 condemned 切片中
    for i := range pods {
        status.Replicas++

        // 统计 running 和 ready 的副本数
        if isRunningAndReady(pods[i]) {
            status.ReadyReplicas++
            // 对门控是否开启特性的判断
            if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) {
                if isRunningAndAvailable(pods[i], set.Spec.MinReadySeconds) {
                    status.AvailableReplicas++
                }
            } else {
                // 如果门控特性未开启,那么所有ready 的副本数将被认为是可用状态的副本数
                status.AvailableReplicas = status.ReadyReplicas
            }
        }

        // 统计 current 和 update 的副本数
        if isCreated(pods[i]) && !isTerminating(pods[i]) {
            if getPodRevision(pods[i]) == currentRevision.Name {
                status.CurrentReplicas++
            }
            if getPodRevision(pods[i]) == updateRevision.Name {
                status.UpdatedReplicas++
            }
        }

        if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
            // replicas 的赋值
            replicas[ord] = pods[i]

        } else if ord >= replicaCount {
            // condemned 的赋值
            condemned = append(condemned, pods[i])
        }
    }
    // 检查 replicas数组中 [0,set.Spec.Replicas) 下标是否有缺失的 pod,若有缺失的则创建对应的 pod object 
    // 在 newVersionedStatefulSetPod 中会判断是使用 currentSet 还是 updateSet 来创建
    for ord := 0; ord < replicaCount; ord++ {
        if replicas[ord] == nil {
            replicas[ord] = newVersionedStatefulSetPod(
                currentSet,
                updateSet,
                currentRevision.Name,
                updateRevision.Name, ord)
        }
    }

    // 对 condemned 数组进行排序 
    sort.Sort(ascendingOrdinal(condemned))


    // 根据 ord 在 replicas 和 condemned 数组中找出 first unhealthy Pod
    for i := range replicas {
        if !isHealthy(replicas[i]) {
            unhealthy++
            if firstUnhealthyPod == nil {
                firstUnhealthyPod = replicas[i]
            }
        }
    }

    for i := range condemned {
        if !isHealthy(condemned[i]) {
            unhealthy++
            if firstUnhealthyPod == nil {
                firstUnhealthyPod = condemned[i]
            }
        }
    }

    if unhealthy > 0 {
        klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
            set.Namespace,
            set.Name,
            unhealthy,
            firstUnhealthyPod.Name)
    }


    // 判断 set 是否处于 deleting 
    if set.DeletionTimestamp != nil {
        return &status, nil
    }

    // 默认设置为非并行模式
    monotonic := !allowsBurst(set)

    // 确保 replicas 数组中的所有 pod 都是 running 状态
    for i := range replicas {
        // 删除和重建失败的 pods
        if isFailed(replicas[i]) {
            ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
                "StatefulSet %s/%s is recreating failed Pod %s",
                set.Namespace,
                set.Name,
                replicas[i].Name)
            if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
                return &status, err
            }
            if getPodRevision(replicas[i]) == currentRevision.Name {
                status.CurrentReplicas--
            }
            if getPodRevision(replicas[i]) == updateRevision.Name {
                status.UpdatedReplicas--
            }
            status.Replicas--
            replicas[i] = newVersionedStatefulSetPod(
                currentSet,
                updateSet,
                currentRevision.Name,
                updateRevision.Name,
                i)
        }

        // 如果 pod 未被创建则进行创建
        if !isCreated(replicas[i]) {
            if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
                return &status, err
            }
            status.Replicas++
            if getPodRevision(replicas[i]) == currentRevision.Name {
                status.CurrentReplicas++
            }
            if getPodRevision(replicas[i]) == updateRevision.Name {
                status.UpdatedReplicas++
            }

            // if the set does not allow bursting, return immediately
            if monotonic {
                return &status, nil
            }
            // pod created, no more work possible for this round
            continue
        }

        // 当 pod 处于 terminating 状态的时候且不允许并行的情况下 则进行等待删除完成
        if isTerminating(replicas[i]) && monotonic {
            klog.V(4).Infof(
                "StatefulSet %s/%s is waiting for Pod %s to Terminate",
                set.Namespace,
                set.Name,
                replicas[i].Name)
            return &status, nil
        }

        // 当 pod 已经被创建且不运行并行的情况下,状态并不是 running 和 ready 状态的处理。
        if !isRunningAndReady(replicas[i]) && monotonic {
            klog.V(4).Infof(
                "StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
                set.Namespace,
                set.Name,
                replicas[i].Name)
            return &status, nil
        }

        // pod creates 成功但是并不是可用状态时的处理。
        if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
            klog.V(4).Infof(
                "StatefulSet %s/%s is waiting for Pod %s to be Available",
                set.Namespace,
                set.Name,
                replicas[i].Name)
            return &status, nil
        }

        // 对 sts 的唯一性及相关存储唯一性的检查
        if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
            continue
        }
        ...
    }
}

至此结束。