kube-scheduler 源码分析系列重点作为代码流程梳理,对于 kube-scheduler 的文档还请详见kube-scheduler介绍。

如下代码分析对于细节的处理会跳过,只看主干

代码入口分析

kubernetes/cmd/kube-scheduler/app/scheduler.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
rand.Seed(time.Now().UnixNano())

// @1
command := app.NewSchedulerCommand()

// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()

//@2
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
  • @1 app.NewSchedulerCommand() 主要为命令行注册对应的 options, 借助cobra 库进行 Command 的对应 parse 和初始化工作。:
  • @2 command.Execute() 实际上是服务启动入口,继续 step in
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // ExecuteContext is the same as Execute(), but sets the ctx on the command.
    // Retrieve ctx by calling cmd.Context() inside your *Run lifecycle functions.
    func (c *Command) ExecuteContext(ctx context.Context) error {
    c.ctx = ctx
    return c.Execute()
    }

    // Execute uses the args (os.Args[1:] by default)
    // and run through the command tree finding appropriate matches
    // for commands and then corresponding flags.
    func (c *Command) Execute() error {
    _, err := c.ExecuteC()
    return err
    }

    func (c *Command) ExecuteC() (cmd *Command, err error) {
    .....
    // initialize help as the last point possible to allow for user
    // overriding
    c.InitDefaultHelpCmd()
    .....
    // initialize the hidden command to be used for bash completion
    c.initCompleteCmd(args)

    // @3
    err = cmd.execute(flags)
    .....
    return cmd, err
    }
    此时我们通过函数名发现 @3 部分是真正执行的部分,继续深入看看:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    func (c *Command) execute(a []string) (err error) {
    .....
    // initialize help and version flag at the last point possible to allow for user
    // overriding
    c.InitDefaultHelpFlag()
    c.InitDefaultVersionFlag()

    .....
    c.preRun()

    .....
    if c.PreRunE != nil {
    if err := c.PreRunE(c, argWoFlags); err != nil {
    return err
    }
    } else if c.PreRun != nil {
    c.PreRun(c, argWoFlags)
    }

    if err := c.validateRequiredFlags(); err != nil {
    return err
    }
    if c.RunE != nil {
    if err := c.RunE(c, argWoFlags); err != nil {
    return err
    }
    } else {
    c.Run(c, argWoFlags)
    }
    if c.PostRunE != nil {
    if err := c.PostRunE(c, argWoFlags); err != nil {
    return err
    }
    } else if c.PostRun != nil {
    c.PostRun(c, argWoFlags)
    }
    .....
    }
    在此段库函数代码中我们会发现一堆与Run相关的函数,此部分库函数我也未做详细学习,但是根据名称我们可以理解实际上是做真正运行的一些前置工作,由于与 kube-scheduler 核心机制没有太大的干系,可以先大概了解下,不能忘了此次分析的主要矛盾。

既然我们知道了最终是 Command 调用了,结合 kube-scheduler 我们是怎么和 上下文关联起来的呢?这里我们回头再继续看下 app.NewSchedulerCommand():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
.....
Run: func(cmd *cobra.Command, args []string) {

if err := runCommand(cmd, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
.....
}

// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
.....
return Run(ctx, cc, sched)
}

// 此处才是kube-scheduler真正意义上执行的具体逻辑
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
.....
}

入口启动流程梳理

流程梳理
以上为主要的启动流程分析,主要还是围绕着 Command 运行的入口流程进行分析。下面我们重点关注kube-scheduler 初始化及真正运行具体做了哪些事情。

kube-scheduler 创建及运行

  • 入口
    runCommand 这个函数:
1
2
3
4
5
6
7
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
.....
cc, sched, err := Setup(ctx, opts, registryOptions...)
.....
return Run(ctx, cc, sched)
}
  • Setup 创建工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}

c, err := opts.Config()
if err != nil {
return nil, nil, err
}

// 获取 config 对象
// Get the completed config
cc := c.Complete()

// 初始化 outOfTreeRegistry, 后续基于 Scheduler Framwork 自定义的 插件注册关联的地方,后续再介绍 Scheduler Framework
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
// 创建事件记录器
recorderFactory := getRecorderFactory(&cc)
// 初始化 scheduler
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {
return nil, nil, err
}

return &cc, sched, nil
}

至此 scheduler 初始化完成,初始化结束。

  • Run 服务运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {

// 根据提供的名称创建Configz
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}

// 事件广播准备
// Prepare the event broadcaster.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

// 创建健康检查数组
// Setup healthz checks.
var checks []healthz.HealthChecker
// 判断是否需要进行leader选举
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}

// 进行健康检查
// Start up the healthz server.
if cc.InsecureServing != nil {
separateMetrics := cc.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
if cc.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}

// 运行所有informer
// Start all informers.
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())

// 在调度前等待缓存同步
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())

// 根据是否开启选举开关进行选举逻辑
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}

leaderElector.Run(ctx)

return fmt.Errorf("lost lease")
}

// 运行服务
// Leader election is disabled, so runCommand inline until done.
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}

至此 kube-scheduler 算是真正意义上的运行了。

scheduler 初始化及运行流程梳理

初始化及运行流程

至此kube-scheduler入口启动过程算是结束了,个人理解,如若有不对地方还望多多指教。