继上篇kubernetes源码分析系列之kube-scheduler-1 了解了大概的入口流程之后,此篇文章重点分析下 pkg/scheduler 包中的内容,scheduler初始化及运行流程分析。

sig-scheduling 中可以了解到关于scheduler设计的目的。

首先我们面对的问题还是如何找到分析的入口,鉴于kubernetes源码分析系列之kube-scheduler-1 得知在 cmd/kube-scheduler/scheduler.go 中关于 scheduler 的创建在setUp函数中,其中有一段代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)

那么话不多说,这里我们从 pkg/scheduler/scheduler.go 进行分析
不过有一说一,kubernetes源码的注释已经做的很nice了,我们可以通过通读源码就理解了作者的意图了

pkg/scheduler/scheduler.go

以下只对关键性的内容做分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 通过监控未调度的 `pods`,通过与 `api server`交互实现发现合适的节点进行调度绑定
type Scheduler struct {
// 根据 `SchedulerCache` 缓存发生的变化可以被 NodeLister 监听到
SchedulerCache internalcache.Cache

Algorithm core.ScheduleAlgorithm

// NextPod 必须为方法,为了保证直到下一个`pod`为可调度状态实现阻塞功能
// 这里并没有使用 `channel`是因为调度`pod`可能会花费一段时间,另外我们也不期望当
// `pods` 在 `channel` 中时变成‘陈旧’状态
NextPod func() *framework.QueuedPodInfo

// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*framework.QueuedPodInfo, error)

// Close this to shut down the scheduler.
StopEverything <-chan struct{}

// SchedulingQueue 队列用于存放被调度的 `pods`
SchedulingQueue internalqueue.SchedulingQueue

// Profiles are the scheduling profiles.
Profiles profile.Map

scheduledPodsHasSynced func() bool

client clientset.Interface
}

至此我们继续回到 scheduler.New{***} 的部分,继续了解在 pkg/scheduler/scheduler.go 中究竟做了哪些事情。

New函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// New returns a Scheduler
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {

stopEverything := stopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}

options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}

schedulerCache := internalcache.New(30*time.Second, stopEverything)
// 这里需要注意下,此处的 `plugins` 注册是针对于 `in-tree` 中的插件,如果我们根据 `schduler framework` 进行的自定义插件,不是通过这边注册 而是通过 `frameworkOutOfTreeRegistry` 进行注册,此阶段将 `in-tree` 及 `out-of-tree` 中的 `plugins` 进行合并
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}

// 初始化 `snapshot`, `snapshot` 为 `cache` 中关于节点信息以及节点树书序。在调度生命周期初始阶段使用`snapshot`,并且在此周期内使用
snapshot := internalcache.NewEmptySnapshot()

// @2 初始化 `Configurator`,用于构造 `scheduler`
configurator := &Configurator{
client: client,
recorderFactory: recorderFactory,
informerFactory: informerFactory,
podInformer: podInformer,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
percentageOfNodesToScore: options.percentageOfNodesToScore,
podInitialBackoffSeconds: options.podInitialBackoffSeconds,
podMaxBackoffSeconds: options.podMaxBackoffSeconds,
profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
registry: registry,
nodeInfoSnapshot: snapshot,
extenders: options.extenders,
frameworkCapturer: options.frameworkCapturer,
}

`metrics` 注册
metrics.Register()

var sched *Scheduler
// 进行调度算法逻辑处理
source := options.schedulerAlgorithmSource
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.
//@3
sc, err := configurator.createFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
sched = sc
case source.Policy != nil:
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
return nil, err
}
case source.Policy.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
return nil, err
}
}
// Set extenders on the configurator now that we've decoded the policy
// In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig,
// which would have set extenders in the above instantiation of Configurator from CC options)
configurator.extenders = policy.Extenders
sc, err := configurator.createFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
sched = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
// Additional tweaks to the config produced by the configurator.
sched.StopEverything = stopEverything
sched.client = client
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced

addAllEventHandlers(sched, informerFactory, podInformer)
return sched, nil
}

分析到现在实际上关于scheduler 初始化做的具体工作结束了,那么下面就到了cmd/kube-scheduler/server.go 中的 Run 函数了,然后最终会调用 pkg/scheduler/scheduler.go 中的 Run 函数。

1
2
3
4
5
6
7
8
9
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
//@1
sched.SchedulingQueue.Run()
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}

通过以上我们发现 Scheduler 结构体中的 SchedulingQueue 出现的很频繁,这也是需要重点分析的部分,题外话到了看到Queue,让我再一次加深了打牢基础的重要性,不还是数据结构的东西么

SchedulingQueue

ShcedulingQueue 此接口主要用通过队列实现对等待调度状态的Pods进行存放,此接口有点类似于cache.FIFO,cache.Heap的模式,同时便于使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type SchedulingQueue interface {
framework.PodNominator
Add(pod *v1.Pod) erro)

// 将不可调度的`pods`放回调度队列中,
AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error

// 调度循环返回当前调度队列缓存中的数目,通常来讲,当`pod`被弹出进行数目的递增
SchedulingCycle() int64

// 方法`Pop`移除队列的首部元素,当队列为空直到有新的元素添加到队列的时候一直处于阻塞状态
Pop() (*framework.QueuedPodInfo, error)

Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveOrBackoffQueue(event string)
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
PendingPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
NumUnschedulablePods() int
// Run starts the goroutines managing the queue.
Run()
}

看到这边我们知道通过定义 SchedulingQueue 接口抽象了调度队列的框架,那么具体的实现是分析的重点。

还记得kubernetes源码分析系列之kube-scheduler-1中我们讲到了Run服务运行,其中有段代码如下:

1
2
// Leader election is disabled, so runCommand inline until done.
sched.Run(ctx)

这边还是调用了schedRun方法,注意这边个人对sched这个的命名的理解是scheduler daemon,这边应该是一个daemon进程。
最终又回到了@1的调用逻辑,实际上就是SchedulingQueueRun方法,此时我们还未看到真正执行调度队列Run方法的承载体,实际上在pkg/scheduler/internal/quque/scheduling_queue.go中在声明 SchedulingQueue接口之后,下文就进行了具体的实现,
即使我们能够确定就是这么个逻辑,但是对于程序来说是如何感知的呢?

1
2
3
4
// @4
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, opts...)
}

请注意这边函数初始化得到一个优先队列,优先队列的返回即为SchedulingQueue

我们继续分析函数NewPriorityQueue(lessFn,opts...),根据如下函数签名:

1
2
3
4
5
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
lessFn framework.LessFunc,
opts ...Option,
) *PriorityQueue

可以得知返回值就是一个*PriorityQueue指针,那么至此就说明了按照SchedulingQueue接口实现的结构体PriorityQueue就是最终的承载体。

到了这边我们实际上还有个问题没有理清,就是那是在什么时候进行NewSchedulingQueue初始化的操作的呢?

我们继续回顾以上内容,重点注意下@2部分初始化的Configurator以及configurator.createFromProvider方法。
Configurator主要用于初始化构造Scheduler,具体的定义是在pkg/scheduler/factory.go

createFromProvider

1
2
3
4
5
6
// 根据注册的算法`provider` 创建`scheduler`
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
.....
// 通过调用 `Configurator.create{...}`方法得到 `Scheduler`
return c.create()
}

继续往下分析 c.create()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 根据注册的插件创建 `scheduler`
func (c *Configurator) create() (*Scheduler, error) {
//其他细节业务逻辑
.....
//这里构造得到的`podQueue` 不就是上文@4的`NewSchedulingQueue`函数么?
podQueue := internalqueue.NewSchedulingQueue(
lessFn,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
)
.....
return &Scheduler{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
Profiles: profiles,
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
StopEverything: c.StopEverything,
SchedulingQueue: podQueue,
}, nil
}

至此关于SchedulingQueue相关的流程梳理打通明白所以然了。

流程梳理

老规矩以上基本上都是围绕着代码进行梳理,对于最后的总结归纳通过流程图是最直接了当的,下面上图。
流程图