k8s 作为云原生最重要的基石之一,她是怎么运作的呢?你是否了解过她是怎么从众多的 node 节点中筛选出符合 pod 的调度节点,这里会从 k8s 的调度原理和流程开始结合源码内容带你了解整个调度过程,并配合一个小的调度实验,让你亲手实现一个简单的k8s调度器。
PS:本文有些长,有兴趣的同学可以先收藏再阅读
k8s 的 scheduler 和 controller manager,kubelet 这些是一样的,都是针对 apiserver 进行控制循环的操作。
以上是一个简单job的创建流程为例。这里面的kube-scheduler
调度器就是我们今天带大家了解的k8s基础组件之一 —— k8s的调度器。
通过上面我们知道,kube-scheduler 主要是负责将 pod 绑定到适合的 node 上面,那么 kube-scheduler 是怎么选择适合的 node 节点的呢? 这里提供了一副 kube-scheduler 调度的全景图:
整个事件流程如下:
在源码解读这小节我会把 kube-scheduler分成三部分,第一部分是 scheduleOne,也就是调度器的主线逻辑,第二部分是 Algorithm,也就是调度阶段的核心流程。
本章节源码基于 kuberenesv1.19版本,commit id: 070ff5e3a98bc3ecd596ed62bc456079bcff0290
先对整个 kube-scheduler 的源码解析图和 scheduler 对象有个初步的认识,方便我们后续查阅:
type Scheduler struct {
// cache缓存,用来优化调度器性能的
SchedulerCache internalcache.Cache
// 调度算法
Algorithm ScheduleAlgorithm
// framework扩展
Extenders []framework.Extender
// 获取下一个需要调度的pod
NextPod func() *framework.QueuedPodInfo
Error func(*framework.QueuedPodInfo, error)
// 通过flag停止调度器.
StopEverything <-chan struct{}
// 调度队列
SchedulingQueue internalqueue.SchedulingQueue
// 调度范围,主要用于判断哪些pod能被这个调度器调度
Profiles profile.Map
// client-go,用于和api-server通信
client clientset.Interface
}
上面是调度器的组件,下面我们再看看调度框架包涵哪些:
// frameworkImpl is the component responsible for initializing and running scheduler
// plugins.
type frameworkImpl struct {
registry Registry
snapshotSharedLister framework.SharedLister
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
// Plugins插入扩展点
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
postFilterPlugins []framework.PostFilterPlugin
preScorePlugins []framework.PreScorePlugin
scorePlugins []framework.ScorePlugin
reservePlugins []framework.ReservePlugin
preBindPlugins []framework.PreBindPlugin
bindPlugins []framework.BindPlugin
postBindPlugins []framework.PostBindPlugin
permitPlugins []framework.PermitPlugin
clientSet clientset.Interface
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
metricsRecorder *metricsRecorder
profileName string
// 用于抢占
preemptHandle framework.PreemptHandle
runAllFilters bool
}
基于上面 kube-scheduler 的源码解析图,我们知道 scheduleOne 的流程如下:
下面是 ScheduleOne 的源码及注释:
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// 从 scheduleQueue 获取需要调度的 pod
podInfo := sched.NextPod()
...
// 通过 pod 的 SchedulerName 判断是否属于这个调度器处理
prof, err := sched.profileForPod(pod)
// 不属于则跳过
if sched.skipPodSchedule(prof, pod) {
return
}
...
// 通过一系列调度算法找到当前 pod 最适合的节点
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
if err != nil {
...
// 如果没有适合的节点,pod会进入抢占流程尝试进行抢占
result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
if status.IsSuccess() && result != nil {
nominatedNode = result.NominatedNodeName
}
return
}
...
// 复制原来的pod信息,设置一个 assume pod 对原pod进行拷贝设置一份缓存,
// 并设置 assume pod 的 nodeName 来标识 pod 完成调度,
// 后续scheduler的其他Plugin做检查和绑定操作全部基于 assume pod
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
...
// reserve plugins 主要做附属资源的预留,比如在 cache 中
// 完成将预设卷和 pod 进行绑定
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
// 如果失败会进行回滚操作
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
...
// permit 是对 pod 绑定操作进行最后的批准、拒绝或者执行延时调度
runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
...
// 这里开始正式进入 pod 绑定阶段
go func() {
// 通过和 permit 结合完成延时调度
waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
...
// prebind 主要准备节点绑定前准备工作,比如PVC和PV绑定,现在默认 prebind 插件只有一个 VolumeBinding
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
...
// 向 api-server 发起节点绑定请求
err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
if err != nil {
// 如果失败会进行回滚操作
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
} else {
// 如果成功后会执行PostBindPlugins, k8s暂时没有默认插件
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()
}
这里面从 RunReservePluginsReserve,RunPermitPlugins,RunPreBindPlugins,RunPreBindPlugins 到 RunPostBindPlugins 都支持用户编写自己的插件扩展 scheduler 调度器。
对照 scheduler framework 官方图解:
详细可以参考kube-scheduler#624提案
我们看看 k8s 的提供的一些默认插件:
func getDefaultConfig() *schedulerapi.Plugins {
return &schedulerapi.Plugins{
// 调度队列排序
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: queuesort.Name},
},
},
// 预过滤
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
// 过滤
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
// 检查节点是否不可调度
{Name: nodeunschedulable.Name},
// 检查节点的空闲资源(例如,CPU和内存)是否满足 Pod 的要求
{Name: noderesources.FitName},
// 检查 Pod 是否通过主机名指定了 Node
{Name: nodename.Name},
// 检查 Pod 请求的端口(网络协议类型)在节点上是否可用
{Name: nodeports.Name},
// 检查 Pod 的亲和性,是否存在硬亲和
{Name: nodeaffinity.Name},
{Name: volumerestrictions.Name},
// 污浊节点检查
{Name: tainttoleration.Name},
{Name: nodevolumelimits.EBSName},
{Name: nodevolumelimits.GCEPDName},
{Name: nodevolumelimits.CSIName},
{Name: nodevolumelimits.AzureDiskName},
// 基于 Pod 的卷的绑定请求,评估 Pod 是否适合节点
{Name: volumebinding.Name},
// 基于 Pod 的卷的zone属性来筛选是否适合节点
{Name: volumezone.Name},
// pod 的拓扑扩展约束判断节点是否适合
{Name: podtopologyspread.Name},
// pod 之间的亲和性
{Name: interpodaffinity.Name},
},
},
// 过滤后
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: defaultpreemption.Name},
},
},
// 预打分
PreScore: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: podtopologyspread.Name},
{Name: tainttoleration.Name},
},
},
// 优先级打分
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
// 资源平衡
{Name: noderesources.BalancedAllocationName, Weight: 1},
// 偏向已在本地缓存 Pod 所需容器镜像的节点
{Name: imagelocality.Name, Weight: 1},
// 实现了 Pod 间亲和性与反亲和性的优先级
{Name: interpodaffinity.Name, Weight: 1},
// 偏向最少请求资源的节点
{Name: noderesources.LeastAllocatedName, Weight: 1},
// 根据节点亲和中 PreferredDuringSchedulingIgnoredDuringExecution 字段对节点进行优先级排序
{Name: nodeaffinity.Name, Weight: 1},
// 根据节点的注解 scheduler.alpha.kubernetes.io/preferAvoidPods 对节点进行优先级排序
{Name: nodepreferavoidpods.Name, Weight: 10000},
// 根据 Pod 拓扑扩展约束的优先级排序
{Name: podtopologyspread.Name, Weight: 2},
// 根据节点上无法忍受的污点数量,给所有节点进行优先级排序
{Name: tainttoleration.Name, Weight: 1},
},
},
// pod预准备
Reserve: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: volumebinding.Name},
},
},
// 预绑定
PreBind: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: volumebinding.Name},
},
},
// 绑定
Bind: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: defaultbinder.Name},
},
},
}
}
Algorithm 是 scheduler 的调度的核心,包涵了一个过滤器和一个打分器,核心逻辑就是把所有适合的节点筛选出来,在再里面找出最优的节点,下面看下 Algorithm 的代码,kube-default 的 algorithm 对象是由一个叫 genericScheduler 的实例实现:
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// 做基本的检查,主要是检查 pod 的 pvc 在命名空间下是否存在
if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}
...
// 将 scheduler cache 和 node info 做一次镜像,方面后续使用可以做到无锁
if err := g.snapshot(); err != nil {
return result, err
}
...
// 找到适合 pod 部署的所有节点
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
...
// 将适合的 pod 按照规则进行打分
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
...
// 找出得分最高的节点
host, err := g.selectHost(priorityList)
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(feasibleNodes),
}, err
}
下面来看看findNodesThatFitPod
方法:
findNodesThatFitPod 比较简单,包涵三个方法,一个是 prefilter 插入点,一个是 filter 插入点,还有一个是 extender filter。
prefilter 主要是做一些过滤前的预处理,比如 node port信息, volumebinding 信息等。
filter 对节点做过滤,找出适合的框架,这里会检查节点的亲和性,资源是否充足,是否存在挂载卷等。
extender 这个是旧版调度器架构的扩展方式,这里就不累赘,有兴趣的可以自行学习。
// node过滤方法
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
// filter会通过 numFeasibleNodesToFind 确定要过滤得到的节点数量大小,主要防止大集群并有大量符合的节点
// 导致性能问题,当集群过大的时候 numFeasibleNodesToFind 会根据集群规模确定一个比例
numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
// Create feasible list with enough space to avoid growing it
// and allow assigning.
feasibleNodes := make([]*v1.Node, numNodesToFind)
// 确保是否存在过滤插件
if !prof.HasFilterPlugins() {
...
}
checkNode := func(i int) {
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
// PodPassesFiltersOnNode 方法会调用 filter plugin 对 node 进行处理获得适合的节点
fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo)
...
}
// 对所有节点并行执行过滤
parallelize.Until(ctx, len(allNodes), checkNode)
...
return feasibleNodes, nil
}
// filter plugins 处理
func PodPassesFiltersOnNode(
ctx context.Context,
ph framework.PreemptHandle,
state *framework.CycleState,
pod *v1.Pod,
info *framework.NodeInfo,
) (bool, *framework.Status, error) {
...
statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
}
在通过findNodesThatFitPod
方法获得适合分配的节点后,需要通过prioritizeNodes
方法来打分找到最适合的节点:
findNodesThatFitPod主要包括 PreScore 和 ScorePlugins两个插入点
RunScorePlugins 方法会先对节点进行打分,然后再对打分插件的打分进行修正,最后乘以各插件的权重系数就得到各插件打分的最终分数
最后再将各种插件打分的结果汇总得到节点的总分表
ScorePlugins打分插件包括:
// 优先级打分
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
// 资源平衡
{Name: noderesources.BalancedAllocationName, Weight: 1},
// 偏向已在本地缓存 Pod 所需容器镜像的节点
{Name: imagelocality.Name, Weight: 1},
// 实现了 Pod 间亲和性与反亲和性的优先级
{Name: interpodaffinity.Name, Weight: 1},
// 偏向最少请求资源的节点
{Name: noderesources.LeastAllocatedName, Weight: 1},
// 根据节点亲和中 PreferredDuringSchedulingIgnoredDuringExecution 字段对节点进行优先级排序
{Name: nodeaffinity.Name, Weight: 1},
// 根据节点的注解 scheduler.alpha.kubernetes.io/preferAvoidPods 对节点进行优先级排序
{Name: nodepreferavoidpods.Name, Weight: 10000},
// 根据节点上无法忍受的污点数量,给所有节点进行优先级排序
{Name: podtopologyspread.Name, Weight: 2},
// 根据 Pod 拓扑扩展约束的优先级排序
{Name: tainttoleration.Name, Weight: 1},
},
},
k8s调度器从1.15开始由 extension 模式改成了 framework 的架构,kube-scheduler整个代码架构提供了更灵活性定制化能力,可以在原架构上满足了更灵活定制化的需求,而不需要重新 fork 一份源码来修改。