Kubernetesk8s 核心组件之一,主要目的即为pod选取合适的node进行绑定。整体流程氛围三部分:

  • 获取未调度的podList
  • 通过执行一系列的调度算法进行选取合适的node
  • 提交数据到apiServer然后进行bind
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    For given pod:
    +---------------------------------------------+
    | Schedulable nodes: |
    | |
    | +--------+ +--------+ +--------+ |
    | | node 1 | | node 2 | | node 3 | |
    | +--------+ +--------+ +--------+ |
    | |
    +-------------------+-------------------------+
    |
    |
    v
    +-------------------+-------------------------+
    Pred. filters: node 3 doesn't have enough resource
    +-------------------+-------------------------+
    |
    |
    v
    +-------------------+-------------------------+
    | remaining nodes: |
    | +--------+ +--------+ |
    | | node 1 | | node 2 | |
    | +--------+ +--------+ |
    | |
    +-------------------+-------------------------+
    |
    |
    v
    +-------------------+-------------------------+
    Priority function: node 1: p=2
    node 2: p=5
    +-------------------+-------------------------+
    |
    |
    v
    select max{node priority} = node 2

主要流程分析

cmd/kube-scheduler/app/server.go

1
2
3
4
5
6
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

命令行参数解析 及创建 scheduler

cmd/kube-scheduler/app/server.go#runCommand

1
cc, sched, err := Setup(ctx, opts, registryOptions...)

cmd/kube-scheduler/app/server.go#Setup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 命令行及 options 参数解析
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
// 校验
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}

// 返回 config 对象
c, err := opts.Config()
if err != nil {
return nil, nil, err
}

// 返回 Config 对象
cc := c.Complete()

// 基于 scheduler framework 开发的 out-of-tree 的调度插件
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}

recorderFactory := getRecorderFactory(&cc)
// 创建 scheduler,调度算法也是在New 方法中确定下来的。
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {
return nil, nil, err
}

return &cc, sched, nil
}

内置调度算法注册

pkg/scheduler/scheduler.go#New()

1
2
3
4
5
6
7
case source.Provider != nil:
// Create the config from a named algorithm provider.
sc, err := configurator.createFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
sched = sc

pkg/scheduler/factory.go#createFromProvider()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
// NewRegistry() 会调用getDefaultConfig 获取默认调度算法并将其注册
r := algorithmprovider.NewRegistry()
defaultPlugins, exist := r[providerName]
if !exist {
return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
}

for i := range c.profiles {
prof := &c.profiles[i]
plugins := &schedulerapi.Plugins{}
plugins.Append(defaultPlugins)
plugins.Apply(prof.Plugins)
prof.Plugins = plugins
}
return c.create()
}

pkg/scheduler/algorithmprovider/registry.go#getDefaultConfig()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
func getDefaultConfig() *schedulerapi.Plugins {
return &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: queuesort.Name},
},
},
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: nodeunschedulable.Name},
{Name: noderesources.FitName},
{Name: nodename.Name},
{Name: nodeports.Name},
{Name: nodeaffinity.Name},
{Name: volumerestrictions.Name},
{Name: tainttoleration.Name},
{Name: nodevolumelimits.EBSName},
{Name: nodevolumelimits.GCEPDName},
{Name: nodevolumelimits.CSIName},
{Name: nodevolumelimits.AzureDiskName},
{Name: volumebinding.Name},
{Name: volumezone.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
},
},
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: defaultpreemption.Name},
},
},
PreScore: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: podtopologyspread.Name},
{Name: tainttoleration.Name},
},
},
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.BalancedAllocationName, Weight: 1},
{Name: imagelocality.Name, Weight: 1},
{Name: interpodaffinity.Name, Weight: 1},
{Name: noderesources.LeastAllocatedName, Weight: 1},
{Name: nodeaffinity.Name, Weight: 1},
{Name: nodepreferavoidpods.Name, Weight: 10000},
// Weight is doubled because:
// - This is a score coming from user preference.
// - It makes its signal comparable to NodeResourcesLeastAllocated.
{Name: podtopologyspread.Name, Weight: 2},
{Name: tainttoleration.Name, Weight: 1},
},
},
Reserve: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: volumebinding.Name},
},
},
PreBind: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: volumebinding.Name},
},
},
Bind: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: defaultbinder.Name},
},
},
}
}

Run

默认参数定义在k8s.io/kubernetes/pkg/scheduler/apis/config/v1alpha1/defaults.go,通过执行run方法启动主逻辑。
Run方法主要做了以下工作:

  • 初始化 scheduler 对象
  • 启动 kube-scheduler server,kube-scheduler 监听 10251 和 10259 端口,10251 端口不需要认证,可以获取 healthz metrics 等信息,10259 为安全端口,需要认证
  • 启动所有的 informer
  • 执行 sched.Run() 方法,执行主调度逻辑

    kubernetes/cmd/kube-scheduler/app/server.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    // Run 主逻辑
    func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
    .....

    if cz, err := configz.New("componentconfig"); err == nil {
    cz.Set(cc.ComponentConfig)
    } else {
    return fmt.Errorf("unable to register configz: %s", err)
    }

    // 事件广播准备
    cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

    // 配置健康检查
    var checks []healthz.HealthChecker
    if cc.ComponentConfig.LeaderElection.LeaderElect {
    checks = append(checks, cc.LeaderElection.WatchDog)
    }

    // 启动健康检查服务
    if cc.InsecureServing != nil {
    separateMetrics := cc.InsecureMetricsServing != nil
    handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
    if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
    return fmt.Errorf("failed to start healthz server: %v", err)
    }
    }
    if cc.InsecureMetricsServing != nil {
    handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
    if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
    return fmt.Errorf("failed to start metrics server: %v", err)
    }
    }
    if cc.SecureServing != nil {
    handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
    // TODO: handle stoppedCh returned by c.SecureServing.Serve
    if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
    // fail early for secure handlers, removing the old error loop from above
    return fmt.Errorf("failed to start secure server: %v", err)
    }
    }

    // 启动所有的Informer
    go cc.PodInformer.Informer().Run(ctx.Done())
    cc.InformerFactory.Start(ctx.Done())

    // 调度之前等待缓存同步
    cc.InformerFactory.WaitForCacheSync(ctx.Done())

    // 如果开启了 leader 选取开关,进行 leader 选举
    if cc.LeaderElection != nil {
    cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
    OnStartedLeading: sched.Run,
    OnStoppedLeading: func() {
    klog.Fatalf("leaderelection lost")
    },
    }
    leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
    if err != nil {
    return fmt.Errorf("couldn't create leader elector: %v", err)
    }

    leaderElector.Run(ctx)

    return fmt.Errorf("lost lease")
    }

    // 执行 run 方法
    sched.Run(ctx)
    return fmt.Errorf("finished without leader elect")
    }