翻译自Writing Controllers

Writing Controller

Kubernetes Controller 是个“常驻调谐进程”。它除了会“监视”对象的期望状态外,也会“监视”对象的运行状态。会通过发送“指令”尝试着将对象的运行状态更加趋近于期望状态。

如下是个简单的 loop 循环:

1
2
3
4
5
for {
desired := getDesiredState()
current := getCurrentState()
makeChanges(desired, current)
}

Guidelines

当我们写 Controller 的时候,有如下知道方针来帮助我们实现我们想要的结果和表现。

  1. 一次之操作一个元素。 如果你使用 workqueue.Interface,你将能够将一个具体的Resource入队,然后稍后将它们 popworker gofuncs,此处需要保证的是同一时间不能有多个 gofuncs 处理同一个元素。

Controllers 会引发多个 Resource 之前的关联关系(例如 Y 发生改变了 我需要检查 X),但是几乎所有的 Controller 会基于 relationships 将检查X的所有放入到队列中。例如,RepicaSet Controller 需要对正在进行删除 pod 做出反应,但是它需要发现关联的RepicaSets 并且对此做出入队。

  1. Resources 随件排序。当 queue off 多种 resources 的时候,将不会保证这些 resources 的顺序。

“监视” 将会实时的进行更新,即使在明显顺序如“create resource A/X”,“create resource B/Y”,Controller 也许注意到的为 create resource B/Ycreate resource A/X

  1. 水平驱逐而不是边缘驱逐。比如某个 shell 脚本没有一直运行,你的 controller 将再重新运行该 shell 之间“休眠”不确定时间。

如果某个 API 对象出现某个 markertrue,你也无法判断出它是由 false 变成 true 的,你只能知道它当前为 true。即使 API “监视”深受其害,所以你将无法对此看出变化,除非你的 controller 在对象的 status 中记录相关信息。

  1. **SharedInformers**。SharedInformers 提供对具体 resource 的添加、更新、删除事件的钩子。同时提供对共享缓存便利性的函数访问。

使用 https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/informers/factory.go 工厂方法来保证使用相同的缓存实例。

这可以使的我们不使用 API server, 重复序列化会消耗服务端资源,重复反序列化会消耗控制器资源以及重复缓存会也会消耗控制器资源。

你也许发现其他的机制比如 reflectorsdeltafifos 驱动控制器。 这些已是陈旧的机制,后续我们将构建 SharedInformers。你需要避免在新的控制器中使用这些。

  1. 切勿转换原始对象。 控制器建通过缓存来进行共享,这就意味着如果你转换了 “拷贝” 对象,你将使得其与其他的控制器混肴了。

最常见的就是通过 “浅拷贝” 然后对 map 进行转换导致失败,比如对 Annotations

  1. 二级缓存。许多控制器拥有一级资源和二级资源。一级资源对应着那些我们将要更新 Status 的原始对象资源。二级资源对应着将要管理的对象。

在启用一级更新操作之前使用 framework.WaitFirCacheSync 来等待二级缓存。

  1. 系统中的其他因素。因为你没更新对象并不意味着没有其他人更新对象。

不要忘记当前状态会在任意时刻发生更新–仅仅观察期望的状态是不够的。如果你使用在期望状态下的对象缺失来提示当前状态下的东西被删除,请确保你的可观察代码中没有错误(例如,在你的缓存填充之前进行处理)。

  1. 过滤错误到顶层以保持一直的重新队列。我们采用 workqueue.RateLimitingInterface 来允许简单的排队与合理的回退。

当在排队时,你的主控制器返回结果应该包含 error。当不存在错误时,则应该使用 utilruntime.HandleEroor 并且以返回 nil 代替它。这使的审核人员能够容易地检查错误处理情况,并确信控制器不会丢失它应该重试处理的内容。

  1. **Watches、Informers 将会同步**。他们会定期的将集群中匹配的对象进行 Update 更新。这对于你可能需要对对象采取额外的操作是很好的,但是大多数情况下你知道不会存在较多的额外工作。

你可以通过比对新旧对象中的资源版本来判断他们是否发生变化来决定是否需要进行再次入队处理。如果它们是相同的则跳过重新入队的工作环节。需要你注意的是,如果你曾经在再次入队的时候失败了,应该是失败处理而不是再次入队,并且不要再对它们进行重试。

  1. 如果你的控制器协调的一级资源在其 Status 中支持 ObservedGeneration,请确保其正确的设置为 MetaData 元数据。当两个字段间不匹配的时候进行生成。

这将让客户端知道控制器在处理资源。确保你的控制器是负责此资源的,否则如果你需要通过自己的控制器与其通信,你将需要在资源的Status 中创建一个不同类型的ObservedGeneration

  1. 考虑到资源创建时对其他资源的所有者引用(例如,ReplicaSet 导致创建 Pods)。因此你得确保被控制器管理的资源被删除时那些依赖的子资源能够很好的呗回收处理。关于所有者引用的更多明细,请参考这里

需要特别注意的是,当父资源或者子资源被标机为删除时,你不应该采用子资源。如果你对资源使用了缓存,你最好通过直接的 API 绕过缓存,以防你观察到的某个资源的所有者引用已被更新。所以你可以确保你的控制器不会与垃圾回收期产生竞争。

查看k8s.io/kubernetes/pull/42938获取更多的细节。

Rough Structure

Controller 大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
type Controller struct {
// podLister is secondary cache of pods which is used for object lookups
podLister cache.StoreToPodLister

// queue is where incoming work is placed to de-dup and to allow "easy"
// rate limmited requeues on errors
queue workqueue.RateLimitingInterface
}

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
// don't let panics crash the process
defer utilruntime.HandleCrash()

// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()

glog.Infof("Starting <NAME> controller")

// wait for your secondary caches to fill before starting your work
if !framework.WaitFirCacheSync(stopCh, c.PodStoreSunced) {
return
}

// start up your worker threads based on threadiness. Some controllers
// have multiple kinds of workers
for i:=0;i<threadiness;i++{
// runWorker will loop until "something bad" happens. The .Until will
// then rekick the worker after one second
go wait.Until(c.runWorker, time.Second, stopCh)
}

// wait until we're told to stop
<-stopCh
glog.Infof("Shutting down <NAME> controller")
}

func (c *Controller) runWorker() {
// hot loop until we're told to stop. processNextWorkItem will
// automatically wait until there's work available, so we don't worry
// about secondary waits
for c.processNextWorkItem() {

}
}

// processNextWorkItem deals with one key off the queue. It returns false
// when it's time to quit.
func (c *Controller) processNextWorkItem() bool {
// pull the next work item from queue. It should be a key we use to lookup
// something in a cache
key, quit := c.queue.Get()
if quit {
return false
}

// you always have to indicate to the queue that you've completed a piece of
// work
defer c.queue.Done(key)

// do your work on the key. This method will contains your "do stuff" logic
err := c.syncHandler(key.(string))

if err == nil {
// if you had no error, thll the queue to stop tracking history for your
// key. This will reset things like failuer counts for per-item rate
// limiting
c.queue.Forget(key)
return true
}

// there was a failure so be sure to report it. This method allows for
// pluggable error handling which can be used for things like
// cluster-monitoring
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))

// since we failed, we should requeue the item to work on later. This
// method will add a backoff to avoid hotlooping on particular items
// (they're probably still not going to work right away) and overall
// controller protection (everything I've done is broken, this controller
// needs to calm down or it can starve other usefull work) cases.
c.queue.AddRateLimited(key)

return true
}