kubernetes源码分析系列之源码分析系列之Statefulset Controller
对于分析
Controller
源码选用StatefulSet Controller
来,其它控制器源码分析一个套路,可以做参考。
StatefulSet
简介
此篇文章默认你已经具备了熟练使用 Statefulset
的基础知识,所以常规介绍及使用 Demo
的描述不在阐述,具体可参考 StatefulSet 基础
StatefulSet Controller
启动分析
kube-manager-controller
入口调用链分析
对于 Kubernetes
的源码组织结构不做过多介绍,希望你有一定的了解。
对于 k8s
是如何启动 kube-controller-manager
,可以通过文档kube-controller-manager
查找到对应如下内容:
1 | --controllers strings 默认值:[*] |
这里我们发现默认值启动中已经加入了 statefulset
的初始化,那么在代码是在哪里体现的呢?继续往下看。
进入 cmd/controller-manager
的 main
函数,实际上就做两个事情:
1 | func main() { |
通过函数调用关系,我们进入 cmd/kube-controller-manager/controllermanager.go
中,查看 Run
执行究竟做了啥。
1 | ... |
深入 KnowControllers()
函数分析:
1 | // KnownControllers returns all known controllers's name |
通过 NewControllerInitializers
可以知道的是真正执行 controller-manager
初始化的执行函数是这个。
1 | // NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func) |
通过如上可以得知 StatefulSet
是如何被初始化到 kube-controller-manager
中的。
Statefulset Controller
启动过程
通过 cmd/kube-manager-controller/app/controllermanager.go
中 Run
函数分析,其中
1 | ... |
进入 Run
函数进行分析:
1 | // Run runs the KubeControllerManagerOptions. This should never exit. |
查看 StartControllers
函数:
1 | // StartControllers starts a set of controllers with a specified ControllerContext |
1 | // 用于判断具体的 `controller` 是否满足接口需求来得到 `controller manager` 支持的特性 |
看到这里我们似乎还是没看到 StatefulSet Controller
真正执行的地方,请再次回顾下我们之前 NewControllerInitializers
中的内容:
1 | func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc { |
感觉看到了一丝丝曙光,继续往下看 startStatefulSetController
的具体实现。
1 | func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { |
看到这里我们终于知道了 StatefulSet Controller
真正意义上是被如何启动的了。
StatefulSet Controller
明细分析
通过如上的分析,下面就到了 StatefulSet Controller
具体的范畴了。
StatefulSetController
1 | type StatefulSetController struct { |
通过对 StatefulSetController
结构体的大纲,了解下大概的结构:
NewStatefulSetController
对于 ssc
的构造函数分析:
1 | func NewStatefulSetController( |
ControllerRevision
这里了解下 ControllerRevision
究竟是啥,为啥需要关注。
1 | // ControllerRevision implements an immutable snapshot of state data. Clients |
由对应的注释可以知道:ControllerRevision
提供给 DaemonSet
和 StatefulSet
用作更新和回滚,ControllerRevision
存放的是数据的快照,ControllerRevision
生成之后内容是不可修改的,由调用端来负责序列化写入和反序列化读取。其中 Revision(int64)
字段相当于 ControllerRevision
的版本 id
号,Data字段则存放序列化后的数据。
所以 Sts
的更新以及回滚是基于新旧 ControllerRevision
的对比来进行的。
NewDefaultStatefulSetControl
深入看下 NewDefaultStatefulSetControl
定义:
1 | // NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that |
Run
函数执行过程
1 | // Run runs the statefulset controller. |
此处关注下 wait.Until
工具:
1 | // Until loops until stop channel is closed, running f every period. |
通过注释可以知道, Until
工具会根据 channel
的关闭来周期性的执行函数 f
。
主要解决的是当我们执行完某些操作后,还需要等待其他资源执行的情况,例如对于有依赖条件的资源释放的时候,A
依赖于 B
,那么对 A
资源释放的时候还需要对 B
资源的释放进行观望。这在 k8s
的资源操作场景是常见的。
继续关注 wait.Until
中包的函数 ssc.worker
。
1 | // worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed |
worker
通过运行一个 goroutine
来处理 processNextWorkItem
直到 controller
相关的 queue
被关闭。
毫无疑问,需要分析 processNextWorkItem()
对应函数:
1 | // processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never |
processNextWorkItem()
主要用于对 queue
的元素出队,并标记为已处理。
1 | // sync syncs the given statefulset. |
syncStatefulSet
:
1 | // syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod). |
UpdateStatefulSet
:
- 获取历史
revisions
; - 计算
currentRevision
和updateRevision
,若sts
处于更新过程中则currentRevision
和updateRevision
值不同; - 调用
ssc.performUpdate
执行实际的sync
操作; - 调用
ssc.updateStatefulSetStatus
更新status subResource
; - 根据
sts
的spec.revisionHistoryLimit
字段清理过期的controllerrevision
;
1 | func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) { |
updateStatefulSet
:
// 作为updateStatefulSet的核心方法,重试保障Statefulset到达期望状态,update策略主要分为三类:
// 1.RollingUpdateStatefulSetStrategyType
// 2.OnDeleteStatefulSetStrategyType
// 3.PartitionStatefulSetStrategyType
func (ssc *defaultStatefulSetControl) updateStatefulSet(
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
collisionCount int32,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
...
// 获取当前和更新的 Revision
currentSet, err := ApplyRevision(set, currentRevision)
...
// 构建 sts 对象
currentSet, err := ApplyRevision(set, currentRevision)
...
// 构建 sts 对象
updateSet, err := ApplyRevision(set, updateRevision)
...
// status 赋值
status := apps.StatefulSetStatus{}
status.ObservedGeneration = set.Generation
status.CurrentRevision = currentRevision.Name
status.UpdateRevision = updateRevision.Name
status.CollisionCount = new(int32)
*status.CollisionCount = collisionCount
// replicas 存放 Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
replicas := make([]*v1.Pod, replicaCount)
// condemned 存放 Pods such that set.Spec.Replicas <= getOrdinal(pod)
condemned := make([]*v1.Pod, 0, len(pods))
...
// 对 pods 进行处理分别存放到 replicas 和 condemned 切片中
for i := range pods {
status.Replicas++
// 统计 running 和 ready 的副本数
if isRunningAndReady(pods[i]) {
status.ReadyReplicas++
// 对门控是否开启特性的判断
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) {
if isRunningAndAvailable(pods[i], set.Spec.MinReadySeconds) {
status.AvailableReplicas++
}
} else {
// 如果门控特性未开启,那么所有ready 的副本数将被认为是可用状态的副本数
status.AvailableReplicas = status.ReadyReplicas
}
}
// 统计 current 和 update 的副本数
if isCreated(pods[i]) && !isTerminating(pods[i]) {
if getPodRevision(pods[i]) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(pods[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
}
if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
// replicas 的赋值
replicas[ord] = pods[i]
} else if ord >= replicaCount {
// condemned 的赋值
condemned = append(condemned, pods[i])
}
}
// 检查 replicas数组中 [0,set.Spec.Replicas) 下标是否有缺失的 pod,若有缺失的则创建对应的 pod object
// 在 newVersionedStatefulSetPod 中会判断是使用 currentSet 还是 updateSet 来创建
for ord := 0; ord < replicaCount; ord++ {
if replicas[ord] == nil {
replicas[ord] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name, ord)
}
}
// 对 condemned 数组进行排序
sort.Sort(ascendingOrdinal(condemned))
// 根据 ord 在 replicas 和 condemned 数组中找出 first unhealthy Pod
for i := range replicas {
if !isHealthy(replicas[i]) {
unhealthy++
if firstUnhealthyPod == nil {
firstUnhealthyPod = replicas[i]
}
}
}
for i := range condemned {
if !isHealthy(condemned[i]) {
unhealthy++
if firstUnhealthyPod == nil {
firstUnhealthyPod = condemned[i]
}
}
}
if unhealthy > 0 {
klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
set.Namespace,
set.Name,
unhealthy,
firstUnhealthyPod.Name)
}
// 判断 set 是否处于 deleting
if set.DeletionTimestamp != nil {
return &status, nil
}
// 默认设置为非并行模式
monotonic := !allowsBurst(set)
// 确保 replicas 数组中的所有 pod 都是 running 状态
for i := range replicas {
// 删除和重建失败的 pods
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas--
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas--
}
status.Replicas--
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
i)
}
// 如果 pod 未被创建则进行创建
if !isCreated(replicas[i]) {
if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
status.Replicas++
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
// if the set does not allow bursting, return immediately
if monotonic {
return &status, nil
}
// pod created, no more work possible for this round
continue
}
// 当 pod 处于 terminating 状态的时候且不允许并行的情况下 则进行等待删除完成
if isTerminating(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// 当 pod 已经被创建且不运行并行的情况下,状态并不是 running 和 ready 状态的处理。
if !isRunningAndReady(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// pod creates 成功但是并不是可用状态时的处理。
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Available",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// 对 sts 的唯一性及相关存储唯一性的检查
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
continue
}
...
}
}
至此结束。