LLLeon / Blog

LLLeon 的部落格
15 stars 4 forks source link

Kubernetes scheduler 源码阅读 #23

Open LLLeon opened 3 years ago

LLLeon commented 3 years ago

Kubernetes scheduler 源码阅读

本文基于 Kubernetes 1.20 版本。

Kubernetes 调度器负责将 Pod 调度到集群内的节点上,它监听 API Server,查询还未分配 Node 的 Pod,然后根据调度策略为这些 Pod 分配节点(更新 Pod 的 NodeName 字段)。

0. 调度框架简介

调度框架是 Kubernetes Scheduler 的一种可插入架构,可以简化调度器的自定义。 它向现有的调度器增加了一组新的“插件” API。插件被编译到调度器程序中。 这些 API 允许大多数调度功能以插件的形式实现,同时使调度“核心”保持简单且可维护。

调度框架定义了一些扩展点。调度器插件注册后在一个或多个扩展点处被调用。 这些插件中的一些可以改变调度决策,而另一些仅用于提供信息。

每次调度一个 Pod 的尝试都分为两个阶段,即 调度周期绑定周期

scheduling-framework-extensions

以上简介内容来自官方文档,下面开始看源码。

Framework是一个接口,要实现此接口需要实现上面提到的各扩展点的方法:

framework 结构实现了 Framework 接口:

type framework struct {
    registry              Registry
    snapshotSharedLister  schedulerlisters.SharedLister
    waitingPods           *waitingPodsMap
    pluginNameToWeightMap map[string]int
    queueSortPlugins      []QueueSortPlugin
    preFilterPlugins      []PreFilterPlugin
    filterPlugins         []FilterPlugin
    preScorePlugins       []PreScorePlugin
    scorePlugins          []ScorePlugin
    reservePlugins        []ReservePlugin
    preBindPlugins        []PreBindPlugin
    bindPlugins           []BindPlugin
    postBindPlugins       []PostBindPlugin
    unreservePlugins      []UnreservePlugin
    permitPlugins         []PermitPlugin

    clientSet       clientset.Interface
    informerFactory informers.SharedInformerFactory
    volumeBinder    *volumebinder.VolumeBinder

    metricsRecorder *metricsRecorder

    // Indicates that RunFilterPlugins should accumulate all failed statuses and not return
    // after the first failure.
    runAllFilters bool
}

1. 启动 kube-scheduler 进程

首先找到 scheduler 的入口:cmd/kube-scheduler/scheduler.go 中的 main 函数,代码编译后,可通过终端命令启动程序。

command := app.NewSchedulerCommand()
// ...
if err := command.Execute(); err != nil {
        os.Exit(1)
    }

进入下一个函数:cmd/kube-scheduler/app/server.go 中的 Run 函数:

Run 函数做的是基于给定的配置信息和 registryOptions 运行 scheduler,只有在出错或终端命令终止时退出:

// 因为 runCommand() 中设置了 context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

简单看一下 Scheduler 里面几个关键字段,先不必深入细节:

// 监控未调度的 Pod, 找到合适的节点, 并将绑定信息写回 API Server.
type Scheduler struct {
  // 通过 SchedulerCache 所做的变更会被 NodeLister 和 Algorithm 观察到.
    SchedulerCache internalcache.Cache
    Algorithm core.ScheduleAlgorithm
    podConditionUpdater podConditionUpdater

  // 用来驱逐 Pods 和更新抢占者 Pod 的 NominatedNode 字段.
    podPreemptor podPreemptor

  // 函数实现应该是阻塞的, 直到有可用的 Pod 才返回.
  // 此函数不使用 channel, 因为调度 Pod 会花一些时间, Pod 放在 channel 中可能会时间过长而 stale.
    NextPod func() *framework.PodInfo
    Error func(*framework.PodInfo, error)

    // Close this to shut down the scheduler.
    StopEverything <-chan struct{}

  // 处理 Pod PVC/PV 的绑定
    VolumeBinder *volumebinder.VolumeBinder

  // 是否禁用 Pod 抢占.
    DisablePreemption bool

  // 待调度 Pod 的队列, 后面会详细介绍
    SchedulingQueue internalqueue.SchedulingQueue

    // Profiles are the scheduling profiles.
    Profiles profile.Map
    scheduledPodsHasSynced func() bool
}

回到主线,Run 函数:

// 初始化调度框架的插件注册表
// ...
// 创建 Scheduler 结构
sched, err := scheduler.New(cc.Client,
        cc.InformerFactory,
        cc.PodInformer,
        recorderFactory,
        ctx.Done(),
        scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
        scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
        scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
        scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
        scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
    )
// 准备事件广播器
// 设置 healthz 检查并启动该服务
// 另起一个 goroutine 运行 informers
// ...
// 运行 scheduler
sched.Run(ctx)

sched.Run 方法开始进行 Pod 调度:

// 执行监听和调度, 等待缓存同步完毕后, 阻塞的执行调度,直到 context cancel。
func (sched *Scheduler) Run(ctx context.Context) {
    if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
        return
    }
  // Pod 队列操作, 里面起了两个 goroutine
    sched.SchedulingQueue.Run()
  // sched.scheduleOne 实现 Pod 的调度逻辑
    wait.UntilWithContext(ctx, sched.scheduleOne, 0)
    sched.SchedulingQueue.Close()
}

下面分开看 Pod 队列的操作和 Pod 调度。

2. 优先级队列 PriorityQueue

sched.SchedulingQueue.Run() 方法将 Pod 在不同的队列中进行移动。SchedulingQueue 是一个 interface,实现此接口的队列用来存储等待调度的 Pod。

实现此接口的结构是 PriorityQueue,它主要包含三个队列:

简单看一下数据结构:

type PriorityQueue struct {
    lock sync.RWMutex // 此结构是非线程安全的

  // Heap 是一个 map+slice 实现的队列.
    activeQ *heap.Heap
    podBackoffQ *heap.Heap
    unschedulableQ *UnschedulablePodsMap

  // nominatedPods 是一个 map, 用来存储被提名的在节点上运行的 Pod.
    nominatedPods *nominatedPodMap
  // 表示调度周期的序号, 当有一个 Pod 从 activeQ 弹出时将加 1.
    schedulingCycle int64
  // 收到一个移动请求时, 缓存调度周期的序号. 
  // 当收到移动请求时, 如果尝试去调度在这个调度周期之前和之中的不可调度的 Pod, 它们将被放回到 activeQueue.
    moveRequestCycle int64
}

实际执行的是 PriorityQueue 的 Run 方法,它负责将 Pod 在三个队列中进行移动:

// wait.Until 函数来保证 flushBackoffQCompleted 和 flushUnschedulableQLeftover 两个方法失败时会不断重试.
func (p *PriorityQueue) Run() {
  // 每 1 秒执行一次 flushBackoffQCompleted 方法, 直到收到停止信号.
    go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
  // 每 30 秒执行一次 flushUnschedulableQLeftover 方法, 直到收到停止信号.
    go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

PriorityQueue.flushBackoffQCompleted 方法做的事情是把 BackoffQ 里面到达重试时间的 Pod 放回到 activeQ:

func (p *PriorityQueue) flushBackoffQCompleted() {
    // 省略加解锁代码(什么情况下会发生锁竞争?)
  for {
    rawPodInfo := p.podBackoffQ.Peek()

    // ...
    // 查看队列头部的 Pod 是否到达重新调度时间 (根据尝试次数计算), 未到达则 return.
    // 这里是先查看 Pod 符合 pop 的条件后才执行真正的 pop 动作.
    // ...

        _, err := p.podBackoffQ.Pop()
    // 入 activeQ 队列
        p.activeQ.Add(rawPodInfo)
    // 唤醒所有等待从 activeQ 队列 pop Pod 的 goroutine (sched.scheduleOne 方法会调用 sched.NextPod() 来获取 Pod)
        defer p.cond.Broadcast()
    }
}

PriorityQueue.flushUnschedulableQLeftover 方法会把在 unschedulableQ 队列中存放时间超过 unschedulableQTimeInterval (60 秒) 的 Pod 移到 podBackoffQ 或 activeQ。

func (p *PriorityQueue) flushUnschedulableQLeftover() {
  // 加锁
    // ...
  // 遍历 map 寻找到达重是时间的 Pod 放入 podsToMove
  // ...

    if len(podsToMove) > 0 {
    // 主要逻辑, 正在等待 backoff 时间的 Pod 会放入 podBackoffQ, 否则放入 activeQ
    // UnschedulableTimeout 表示一个事件, 用来统计 metrics
        p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
    }
}

接下来看 Pod 真正的调度逻辑。

3. Pod 调度

Pod 的调度逻辑在 Scheduler.scheduleOne 方法中实现。大体思路是先找到合适的节点,缓存必要信息,假定 Pod 已经运行在该节点上,真正的绑定操作是异步进行的。

看一下它的调度逻辑:

  1. 从队列中获取一个待调度的 Pod。

  2. 获取此 Pod 所属调度器的 Profile,包括根据给定的配置创建的 Framework:

    // pkg/scheduler/profile/profile.go
    type Profile struct {
    framework.Framework
    Recorder events.EventRecorder
    }
    
    // pkg/scheduler/apis/config/types.go
    type KubeSchedulerProfile struct {
    SchedulerName string
     // 调度器要用到的插件
    Plugins *Plugins
    PluginConfig []PluginConfig
    }
  3. 创建一个 CycleState 结构供插件读写数据,各插件的数据可以互相读写。

    // pkg/scheduler/framework/v1alpha1/cycle_state.go
    type CycleState struct {
    mx      sync.RWMutex
    storage map[StateKey]StateData
    // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
    recordPluginMetrics bool
    }
  4. 调用 sched.Algorithm.Schedule() 方法,经过调用一系列插件的过滤及打分,得到一个符合的节点。

    • 运行 Prefilter 插件集。
    • 调用 findNodesThatFitPod() 方法,经过 Filter 和 Extender 的过滤,得到符合条件的节点列表。
    • 运行 Prescore 插件集,都成功后进行后续逻辑。
    • 调用 prioritizeNodes() 方法执行 Score 插件集,也可以运行任何的 extender。
      • 每个插件的分数加在一起,就是一个节点的总分。
      • 返回 NodeScore 列表。
    • 选择一个得分最高的节点。
  5. 复制一份 Pod 信息,假定该 Pod 已经运行在选定的节点上,即使还没有绑定它们。这样调度器可以继续调度其它 Pod,而无需等待绑定操作完成(绑定操作是异步进行的)。

  6. 调用 AssumePodVolumes() 方法缓存 Pod 的节点选择。如果需要 PVC 绑定,则只在内存中缓存。

    • AssumePodVolumes 将把缓存的匹配 PV 和 PVC 提供给 podBindingCache 中的所选节点。
    • 用新的预绑定 PV 更新 pvCache。
    • 用新的带 annotations 集合的 PVC 更新 pvcCache。
    • 用为 PV 和 PVC 缓存的 API 更新再次更新 podBindingCache 。
  7. 运行 Reserve 插件。

  8. 调用 assume 方法来把 assumedPod 缓存起来,缓存前它会设置 assumed.Spec.NodeName = scheduleResult.SuggestedHost,即所谓的 Pod 绑定信息。

  9. 调用 RunPermitPlugins() 方法运行 Permit 插件集。Permit 插件用于防止或延迟 Pod 的绑定。

    • 如果返回的不是 Success 或 Wait,将不会继续执行剩下的 Permit 插件并返回错误。
    • 如果有任一插件返回的是 Wait,在所有插件运行完后,会创建一个 waitingPod(已经开始) 并将其放入 waitingPods map 里(后面会用到),随后返回。
    • 如果都返回 Success,则继续后面的异步绑定操作。
  10. 起一个 goroutine 进行绑定:

    • 执行 WaitOnPermit 方法在 Permit 阶段等待 Pod。
    • 绑定 Volumes。
      • 它使用前面假定的绑定更新 API,并等待 PV 控制器完成绑定操作。
      • 如果绑定失败,触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
    • 执行 prebind 插件。同上面一样,失败后会触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
    • 执行 bind 操作。
      • 绑定的优先级:先执行 extender 再执行 framework 的 Bind 插件。其实就是把 Pod 与 Node 的绑定信息发送给 API Server 处理。
      • 绑定成功后,会调用 finishBinding() 方法使缓存的 Pod 过期。
    • 如果绑定失败,触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
    • 执行 Postbind 插件。

至此,调度一个 Pod 的逻辑就梳理完毕了,不过,还有一些细节需要再梳理。

4. 参考

  1. 官方文档