继上篇kubernetes源码分析系列之kube-scheduler-1 了解了大概的入口流程之后,此篇文章重点分析下 pkg/scheduler
包中的内容,scheduler初始化及运行流程分析。
在sig-scheduling 中可以了解到关于scheduler
设计的目的。
首先我们面对的问题还是如何找到分析的入口 ,鉴于kubernetes源码分析系列之kube-scheduler-1 得知在 cmd/kube-scheduler/scheduler.go
中关于 scheduler
的创建在setUp
函数中,其中有一段代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory, cc.PodInformer, recorderFactory, ctx.Done(), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), scheduler.WithExtenders(cc.ComponentConfig.Extenders...), )
那么话不多说,这里我们从 pkg/scheduler/scheduler.go
进行分析不过有一说一,kubernetes
源码的注释已经做的很nice了,我们可以通过通读源码就理解了作者的意图了
pkg/scheduler/scheduler.go
以下只对关键性的内容做分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 // 通过监控未调度的 `pods`,通过与 `api server`交互实现发现合适的节点进行调度绑定 type Scheduler struct { // 根据 `SchedulerCache` 缓存发生的变化可以被 NodeLister 监听到 SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm // NextPod 必须为方法,为了保证直到下一个`pod`为可调度状态实现阻塞功能 // 这里并没有使用 `channel`是因为调度`pod`可能会花费一段时间,另外我们也不期望当 // `pods` 在 `channel` 中时变成‘陈旧’状态 NextPod func() *framework.QueuedPodInfo // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*framework.QueuedPodInfo, error) // Close this to shut down the scheduler. StopEverything <-chan struct{} // SchedulingQueue 队列用于存放被调度的 `pods` SchedulingQueue internalqueue.SchedulingQueue // Profiles are the scheduling profiles. Profiles profile.Map scheduledPodsHasSynced func() bool client clientset.Interface }
至此我们继续回到 scheduler.New{***}
的部分,继续了解在 pkg/scheduler/scheduler.go
中究竟做了哪些事情。
New函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 // New returns a Scheduler func New(client clientset.Interface, informerFactory informers.SharedInformerFactory, podInformer coreinformers.PodInformer, recorderFactory profile.RecorderFactory, stopCh <-chan struct{}, opts ...Option) (*Scheduler, error) { stopEverything := stopCh if stopEverything == nil { stopEverything = wait.NeverStop } options := defaultSchedulerOptions for _, opt := range opts { opt(&options) } schedulerCache := internalcache.New(30*time.Second, stopEverything) // 这里需要注意下,此处的 `plugins` 注册是针对于 `in-tree` 中的插件,如果我们根据 `schduler framework` 进行的自定义插件,不是通过这边注册 而是通过 `frameworkOutOfTreeRegistry` 进行注册,此阶段将 `in-tree` 及 `out-of-tree` 中的 `plugins` 进行合并 registry := frameworkplugins.NewInTreeRegistry() if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil { return nil, err } // 初始化 `snapshot`, `snapshot` 为 `cache` 中关于节点信息以及节点树书序。在调度生命周期初始阶段使用`snapshot`,并且在此周期内使用 snapshot := internalcache.NewEmptySnapshot() // @2 初始化 `Configurator`,用于构造 `scheduler` configurator := &Configurator{ client: client, recorderFactory: recorderFactory, informerFactory: informerFactory, podInformer: podInformer, schedulerCache: schedulerCache, StopEverything: stopEverything, percentageOfNodesToScore: options.percentageOfNodesToScore, podInitialBackoffSeconds: options.podInitialBackoffSeconds, podMaxBackoffSeconds: options.podMaxBackoffSeconds, profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...), registry: registry, nodeInfoSnapshot: snapshot, extenders: options.extenders, frameworkCapturer: options.frameworkCapturer, } `metrics` 注册 metrics.Register() var sched *Scheduler // 进行调度算法逻辑处理 source := options.schedulerAlgorithmSource switch { case source.Provider != nil: // Create the config from a named algorithm provider. //@3 sc, err := configurator.createFromProvider(*source.Provider) if err != nil { return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err) } sched = sc case source.Policy != nil: // Create the config from a user specified policy source. policy := &schedulerapi.Policy{} switch { case source.Policy.File != nil: if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil { return nil, err } case source.Policy.ConfigMap != nil: if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil { return nil, err } } // Set extenders on the configurator now that we've decoded the policy // In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig, // which would have set extenders in the above instantiation of Configurator from CC options) configurator.extenders = policy.Extenders sc, err := configurator.createFromConfig(*policy) if err != nil { return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) } sched = sc default: return nil, fmt.Errorf("unsupported algorithm source: %v", source) } // Additional tweaks to the config produced by the configurator. sched.StopEverything = stopEverything sched.client = client sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced addAllEventHandlers(sched, informerFactory, podInformer) return sched, nil }
分析到现在实际上关于scheduler
初始化做的具体工作结束了,那么下面就到了cmd/kube-scheduler/server.go
中的 Run
函数了,然后最终会调用 pkg/scheduler/scheduler.go
中的 Run
函数。
1 2 3 4 5 6 7 8 9 func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } //@1 sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }
通过以上我们发现 Scheduler
结构体中的 SchedulingQueue
出现的很频繁,这也是需要重点分析的部分,题外话到了看到Queue
,让我再一次加深了打牢基础的重要性,不还是数据结构的东西么
SchedulingQueue
ShcedulingQueue
此接口主要用通过队列实现对等待调度状态的Pods
进行存放,此接口有点类似于cache.FIFO
,cache.Heap
的模式,同时便于使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 type SchedulingQueue interface { framework.PodNominator Add(pod *v1.Pod) erro) // 将不可调度的`pods`放回调度队列中, AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error // 调度循环返回当前调度队列缓存中的数目,通常来讲,当`pod`被弹出进行数目的递增 SchedulingCycle() int64 // 方法`Pop`移除队列的首部元素,当队列为空直到有新的元素添加到队列的时候一直处于阻塞状态 Pop() (*framework.QueuedPodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error MoveAllToActiveOrBackoffQueue(event string) AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) PendingPods() []*v1.Pod // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. NumUnschedulablePods() int // Run starts the goroutines managing the queue. Run() }
看到这边我们知道通过定义 SchedulingQueue
接口抽象了调度队列的框架,那么具体的实现是分析的重点。
还记得kubernetes源码分析系列之kube-scheduler-1 中我们讲到了Run
服务运行,其中有段代码如下:
1 2 // Leader election is disabled, so runCommand inline until done. sched.Run(ctx)
这边还是调用了sched
的Run
方法,注意这边个人对sched
这个的命名的理解是scheduler daemon
,这边应该是一个daemon
进程。 最终又回到了@1的调用逻辑,实际上就是SchedulingQueue
的Run
方法,此时我们还未看到真正执行调度队列Run
方法的承载体,实际上在pkg/scheduler/internal/quque/scheduling_queue.go
中在声明 SchedulingQueue
接口之后,下文就进行了具体的实现, 即使我们能够确定就是这么个逻辑,但是对于程序来说是如何感知的呢?
1 2 3 4 // @4 func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue { return NewPriorityQueue(lessFn, opts...) }
请注意这边函数初始化得到一个优先队列,优先队列的返回即为SchedulingQueue
我们继续分析函数NewPriorityQueue(lessFn,opts...)
,根据如下函数签名:
1 2 3 4 5 // NewPriorityQueue creates a PriorityQueue object. func NewPriorityQueue( lessFn framework.LessFunc, opts ...Option, ) *PriorityQueue
可以得知返回值就是一个*PriorityQueue
指针,那么至此就说明了按照SchedulingQueue
接口实现的结构体PriorityQueue
就是最终的承载体。
到了这边我们实际上还有个问题没有理清,就是那是在什么时候进行NewSchedulingQueue
初始化的操作的呢?
我们继续回顾以上内容,重点注意下@2部分初始化的Configurator
以及configurator.createFromProvider
方法。Configurator
主要用于初始化构造Scheduler
,具体的定义是在pkg/scheduler/factory.go
中
createFromProvider
1 2 3 4 5 6 // 根据注册的算法`provider` 创建`scheduler` func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) { ..... // 通过调用 `Configurator.create{...}`方法得到 `Scheduler` return c.create() }
继续往下分析 c.create()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 // 根据注册的插件创建 `scheduler` func (c *Configurator) create() (*Scheduler, error) { //其他细节业务逻辑 ..... //这里构造得到的`podQueue` 不就是上文@4的`NewSchedulingQueue`函数么? podQueue := internalqueue.NewSchedulingQueue( lessFn, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), ) ..... return &Scheduler{ SchedulerCache: c.schedulerCache, Algorithm: algo, Profiles: profiles, NextPod: internalqueue.MakeNextPodFunc(podQueue), Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), StopEverything: c.StopEverything, SchedulingQueue: podQueue, }, nil }
至此关于SchedulingQueue
相关的流程梳理打通明白所以然了。
流程梳理 老规矩以上基本上都是围绕着代码进行梳理,对于最后的总结归纳通过流程图是最直接了当的,下面上图。