上文主要对每个预选算法进行了源码解读,本文则对优选的策略进行详细的解密。这篇文章拖了有点久,最近事情比较多。
预选 |
---|
CheckNodeUnschedulablePred (Node是否可调度) GeneralPred (检测资源是否充足,pod的host,port,selector是否匹配) HostNamePred (pod指定的node名称是否和node名称相同) PodFitsHostPortsPred (请求的pod的port,在该node上是否已经被占用) MatchNodeSelectorPred (NodeSelect匹配及亲和度匹配, label的匹配) PodFitsResourcesPred (资源检测) NoDiskConflictPred (检测挂载的卷和已经存在的卷是否有冲突) PodToleratesNodeTaintsPred (检测pod的容忍度能否容忍这个node上的污点 ) PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred (检测NodeLabel是否存在) CheckServiceAffinityPred (-) MaxEBSVolumeCountPred (过时) MaxGCEPDVolumeCountPred (过时) MaxCSIVolumeCountPred (检测Node的Volume数量是否超过最大值) MaxAzureDiskVolumeCountPred (过时) MaxCinderVolumeCountPred (过时) CheckVolumeBindingPred (检查该node的PV是否满足PVC) NoVolumeZoneConflictPred (Volume的Zone是否冲突) EvenPodsSpreadPred (node是否满足拓扑传播限制) MatchInterPodAffinityPred (检查是否打破pod Affinity与anti Affinity) |
优选 |
---|
EqualPriority 平等权限,配置时直接跳过 MostRequestedPriority 请求的资源占可分配的比例越大,得分越高 RequestedToCapacityRatioPriority 分段按请求资源比例计算得分 SelectorSpreadPriority 同一个svc、RC、RS、StatefulSet的pod尽量调度到不同的node上,也支持zone ServiceSpreadingPriority 同上,只支持svc的匹配 InterPodAffinityPriority 指定哪些pod调度到不同的拓扑域中 LeastRequestedPriority 按请求最低使用率计算得分,与MostRequestedPriority几乎相反 BalancedResourceAllocation cpu, memory, volume资源均衡申请,必须与LeastRequestedPriority一起使用 NodePreferAvoidPodsPriority 根据node的annotation: scheduler.alpha.kubernetes.io/preferAvoidPods进行调度 NodeAffinityPriority 根据node亲和度计算得分,如果亲和,则加上对应weight TaintTolerationPriority 根据node的taint类型,pod的容忍度的effect:PreferNoSchedule计算得分 ImageLocalityPriority 当前镜像是否在node上,得分根据镜像大小及传播度决定 ResourceLimitsPriority node上的资源是否满足pod的limits EvenPodsSpreadPriority 满足拓扑传递限制的pod的个数计算得分 |
先回顾下优选算法执行的流程,针对每个pod,先使用16个协程并行进行Map操作,Map操作每次要遍历配置的所有的的优选算法,建立node和算法的映射关系,然后并行进行Reduce操作(有的算法是没有reduce的),这里Map使用的ParallelizeUntil的方法,为什么Reduce没有使用ParallelizeUntil的方法呢?值得思考的问题,ParallelizeUntil的代码可以好好看看。另外这里代码个人觉得不是很好,golang的设计模式是推荐不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存,这里还是用了共享内存的方式。Map-Reduce之后,将得分进行加权求和,然后在检测extenders,最终加权得到总分。优选算法并没有指定odering数组,但调用也是按照配置的顺序执行。
func (g *genericScheduler) prioritizeNodes(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
meta interface{},
nodes []*v1.Node,
) (framework.NodeScoreList, error) {
// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() {
... ...
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
for i := range g.prioritizers {
var err error
results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Name = nodes[index].Name
}
}
})
for i := range g.prioritizers {
if g.prioritizers[i].Reduce == nil {
continue
}
wg.Add(1)
go func(index int) {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()
wg.Done()
}()
if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {
appendError(err)
}
... ...
}(i)
}
... ...
for i := range nodes {
result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
for j := range g.prioritizers {
result[i].Score += results[j][i].Score * g.prioritizers[j].Weight
}
for j := range scoresMap {
result[i].Score += scoresMap[j][i].Score
}
}
if len(g.extenders) != 0 && nodes != nil {
... ...
go func(extIndex int) {
... ...
prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
... ...
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
... ...
combinedScores[host] += score * weight
}
mu.Unlock()
}(i)
}
... ...
return result, nil
}
EqualPriority没有做什么事情,在初始化创建优选算法列表的时候,直接被跳过了。
for _, priority := range policy.Priorities {
if priority.Name == priorities.EqualPriority {
klog.V(2).Infof("Skip registering priority: %s", priority.Name)
continue
}
klog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys.Insert(RegisterCustomPriorityFunction(priority, c.configProducerArgs))
}
注册函数如下:
scheduler.RegisterPriorityMapReduceFunction(priorities.MostRequestedPriority, priorities.MostRequestedPriorityMap, nil, 1)
计算方法:
func (r *ResourceAllocationPriority) PriorityMap(
pod *v1.Pod,
meta interface{},
nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
if r.resourceToWeightMap == nil {
return framework.NodeScore{}, fmt.Errorf("resources not found")
}
requested := make(ResourceToValueMap, len(r.resourceToWeightMap))
allocatable := make(ResourceToValueMap, len(r.resourceToWeightMap))
for resource := range r.resourceToWeightMap {
allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)
}
var score int64
// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.
if len(pod.Spec.Volumes) >= 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {
score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
} else {
score = r.scorer(requested, allocatable, false, 0, 0)
}
... ...
return framework.NodeScore{
Name: node.Name,
Score: score,
}, nil
}
记住上面这个方法,后面好几个算法调用了这个方法,大致流程:
score方法最终调用如下方法:
// The used capacity is calculated on a scale of 0-10
func mostRequestedScore(requested, capacity int64) int64 {
if capacity == 0 {
return 0
}
if requested > capacity {
return 0
}
return (requested * framework.MaxNodeScore) / capacity
}
该方法很简单了,framework.MaxNodeScore是个常量,它的值是100,这和注释里面说的0-10有点出入,应该是后面做了扩展,计算requested在可分配的capaciy中的比重是多大,比重越大,则得分越高,如果requested==capacity,则这里得分应该是100,似乎注释写的有误了。该策略和LeastRequestedPriority几乎是相反的。注释也举了个简单的例子:
(cpu(10 * sum(requested) / capacity) + memory(10 * sum(requested) / capacity)) / 2
注册函数如下:
cheduler.RegisterPriorityMapReduceFunction(
priorities.RequestedToCapacityRatioPriority,
priorities.RequestedToCapacityRatioResourceAllocationPriorityDefault().PriorityMap,
nil,
1)
调用方法与MostRequestedPriority类似,主要看下RequestedToCapacityRatioPriority的score方法buildRequestedToCapacityRatioScorerFunction:
func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape FunctionShape, resourceToWeightMap ResourceToWeightMap) func(ResourceToValueMap, ResourceToValueMap, bool, int, int) int64 {
rawScoringFunction := buildBrokenLinearFunction(scoringFunctionShape)
err := validateResourceWeightMap(resourceToWeightMap)
if err != nil {
klog.Error(err)
}
resourceScoringFunction := func(requested, capacity int64) int64 {
if capacity == 0 || requested > capacity {
return rawScoringFunction(maxUtilization)
}
return rawScoringFunction(maxUtilization - (capacity-requested)*maxUtilization/capacity)
}
return func(requested, allocable ResourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
var nodeScore, weightSum int64
for resource, weight := range resourceToWeightMap {
resourceScore := resourceScoringFunction(requested[resource], allocable[resource])
if resourceScore > 0 {
nodeScore += resourceScore * weight
weightSum += weight
}
}
if weightSum == 0 {
return 0
}
return int64(math.Round(float64(nodeScore) / float64(weightSum)))
}
}
func buildBrokenLinearFunction(shape FunctionShape) func(int64) int64 {
n := len(shape)
return func(p int64) int64 {
for i := 0; i < n; i++ {
if p <= shape[i].Utilization {
if i == 0 {
return shape[0].Score
}
return shape[i-1].Score + (shape[i].Score-shape[i-1].Score)*(p-shape[i-1].Utilization)/(shape[i].Utilization-shape[i-1].Utilization)
}
}
return shape[n-1].Score
}
}
这里计算score的方法是buildBrokenLinearFunction,采用分段的方式计算score,按照资源使用率来分段,默认定义的有两段:
Utilization | Score |
---|---|
0 | 100 |
100 | 0 |
这里计算Utilization的公式为:
maxUtilization - (capacity-requested)*maxUtilization/capacity
maxUtilization为100,对该公式化简下:
maxUtilization * requested / capacity
也就是说申请资源越大,使用率越高,得分越低。那这里和LeastRequestedPriority有什么区别呢?其实这里两个段之间是线性计算的,按照默认的两段的话,就等于 1 - Utilization, 但这里做的好处是用户可以定义对应的段,比如定义使用率为50的时候,分数为80, 那么当使用率小于50的时候,分数就线性分布在80-100之间,这样使用率小于50的分数就都比较高,而使用率大约50的分数就分布在0-80之间,些许的变动就造成分数差异很大。
尽量将同一个svc、replication controller等的pod调度到不同的node上,包括Map和Reduce两个方法:
func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
var selector labels.Selector
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
priorityMeta, ok := meta.(*priorityMetadata)
if ok {
selector = priorityMeta.podSelector
} else {
selector = getSelector(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
}
count := countMatchingPods(pod.Namespace, selector, nodeInfo)
return framework.NodeScore{
Name: node.Name,
Score: int64(count),
}, nil
}
map方法是针对每个node,计算当前node上match该pod的个数即为得分,match是通过svc,rs等的selector的组合来进行筛选,这里看出match的count越大,则得分越高,其实这个分数是反的,将在Reduce进行计算。
func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error {
countsByZone := make(map[string]int64, 10)
maxCountByZone := int64(0)
maxCountByNodeName := int64(0)
for i := range result {
if result[i].Score > maxCountByNodeName {
maxCountByNodeName = result[i].Score
}
nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
if err != nil {
return err
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID == "" {
continue
}
countsByZone[zoneID] += result[i].Score
}
for zoneID := range countsByZone {
if countsByZone[zoneID] > maxCountByZone {
maxCountByZone = countsByZone[zoneID]
}
}
haveZones := len(countsByZone) != 0
maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
maxCountByZoneFloat64 := float64(maxCountByZone)
MaxNodeScoreFloat64 := float64(framework.MaxNodeScore)
for i := range result {
// initializing to the default/max node score of maxPriority
fScore := MaxNodeScoreFloat64
if maxCountByNodeName > 0 {
fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)
}
// If there is zone information present, incorporate it
if haveZones {
nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
if err != nil {
return err
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID != "" {
zoneScore := MaxNodeScoreFloat64
if maxCountByZone > 0 {
zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
}
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
}
}
result[i].Score = int64(fScore)
if klog.V(10) {
klog.Infof(
"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Name, int64(fScore),
)
}
}
return nil
}
这里计算score的公式为:
fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)
MaxNodeScoreFloat64为100, 做了减法,用最大count减去Map得到的count再除以最大count,这样count越大,则得分越低了。
另外还根据zone进行了计算,计算方法类似,zone设置了权重,上面计算的fScore占1/3, zone的权重占2/3。
方法和SelectorSpreadPriority是一样的,不过该算法只检测svc。
指定哪些pod调度到不同的拓扑域中
注册方法:
scheduler.RegisterPriorityMapReduceFunction(priorities.InterPodAffinityPriority, priorities.CalculateInterPodAffinityPriorityMap, priorities.CalculateInterPodAffinityPriorityReduce, 1)
Map方法:
func CalculateInterPodAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
var topologyScore topologyPairToScore
if priorityMeta, ok := meta.(*priorityMetadata); ok {
topologyScore = priorityMeta.topologyScore
}
var score int64
for tpKey, tpValues := range topologyScore {
if v, exist := node.Labels[tpKey]; exist {
score += tpValues[v]
}
}
return framework.NodeScore{Name: node.Name, Score: score}, nil
}
Map方法是检测当前node是否包含对应的拓扑域的label,如果有则加上该key对应的value。这里有个方法稍微复杂点:buildTopologyPairToScore, 根据亲和度、反亲和度计算拓扑域的得分。
Reduce方法:
func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister,
result framework.NodeScoreList) error {
var topologyScore topologyPairToScore
if priorityMeta, ok := meta.(*priorityMetadata); ok {
topologyScore = priorityMeta.topologyScore
}
if len(topologyScore) == 0 {
return nil
}
var maxCount, minCount int64
for i := range result {
score := result[i].Score
if score > maxCount {
maxCount = score
}
if score < minCount {
minCount = score
}
}
maxMinDiff := maxCount - minCount
for i := range result {
fScore := float64(0)
if maxMinDiff > 0 {
fScore = float64(framework.MaxNodeScore) * (float64(result[i].Score-minCount) / float64(maxMinDiff))
}
result[i].Score = int64(fScore)
}
return nil
}
Reduce计算fScore的公式:
fScore = float64(framework.MaxNodeScore) * (float64(result[i].Score-minCount) / float64(maxMinDiff))
这里可以理解为对Map的score做了个标准化
注册方法:
scheduler.RegisterPriorityMapReduceFunction(priorities.LeastRequestedPriority, priorities.LeastRequestedPriorityMap, nil, 1)
与MostRequestedPriority类似,直接看对应的score方法:
func leastRequestedScore(requested, capacity int64) int64 {
if capacity == 0 {
return 0
}
if requested > capacity {
return 0
}
return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity
}
requested越大,得分越低。
cpu, memory, volume资源均衡使用。
注册方法:
scheduler.RegisterPriorityMapReduceFunction(priorities.BalancedResourceAllocation, priorities.BalancedResourceAllocationMap, nil, 1)
与MostRequestedPriority,LeastRequestedPriority流程类似,主要看score方法balancedResourceScorer:
func balancedResourceScorer(requested, allocable ResourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
cpuFraction := fractionOfCapacity(requested[v1.ResourceCPU], allocable[v1.ResourceCPU])
memoryFraction := fractionOfCapacity(requested[v1.ResourceMemory], allocable[v1.ResourceMemory])
// This to find a node which has most balanced CPU, memory and volume usage.
if cpuFraction >= 1 || memoryFraction >= 1 {
// if requested >= capacity, the corresponding host should never be preferred.
return 0
}
if includeVolumes && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && allocatableVolumes > 0 {
volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes)
if volumeFraction >= 1 {
// if requested >= capacity, the corresponding host should never be preferred.
return 0
}
// Compute variance for all the three fractions.
mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3))
// Since the variance is between positive fractions, it will be positive fraction. 1-variance lets the
// score to be higher for node which has least variance and multiplying it with 10 provides the scaling
// factor needed.
return int64((1 - variance) * float64(framework.MaxNodeScore))
}
// Upper and lower boundary of difference between cpuFraction and memoryFraction are -1 and 1
// respectively. Multiplying the absolute value of the difference by 10 scales the value to
// 0-10 with 0 representing well balanced allocation and 10 poorly balanced. Subtracting it from
// 10 leads to the score which also scales from 0 to 10 while 10 representing well balanced.
diff := math.Abs(cpuFraction - memoryFraction)
return int64((1 - diff) * float64(framework.MaxNodeScore))
}
BalancedResourceAllocation不能单独使用,必须和LeastRequestedPriority一起使用。这里的资源均衡使用,指的是单个node上的资源cpu,memory的申请比例要尽量保持一致,比如cpu申请了占可分配的50%,那memory也尽量申请的是占可分配的50%。
这里也对存储进行了考虑,如果没有存储,则公式为:
diff := math.Abs(cpuFraction - memoryFraction)
return int64((1 - diff) * float64(framework.MaxNodeScore))
diff相差越小,则得分越高,反之得分越低。
考虑存储,则公式为:
mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3))
return int64((1 - variance) * float64(framework.MaxNodeScore))
这里计算了3个申请占可分配比例的方差,也是方差越小,则得分越高。和上面diff的含义一直。
根据node的annotation: scheduler.alpha.kubernetes.io/preferAvoidPods进行调度
func CalculateNodePreferAvoidPodsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
var controllerRef *metav1.OwnerReference
if priorityMeta, ok := meta.(*priorityMetadata); ok {
controllerRef = priorityMeta.controllerRef
} else {
// We couldn't parse metadata - fallback to the podspec.
controllerRef = metav1.GetControllerOf(pod)
}
if controllerRef != nil {
// Ignore pods that are owned by other controller than ReplicationController
// or ReplicaSet.
if controllerRef.Kind != "ReplicationController" && controllerRef.Kind != "ReplicaSet" {
controllerRef = nil
}
}
if controllerRef == nil {
return framework.NodeScore{Name: node.Name, Score: framework.MaxNodeScore}, nil
}
avoids, err := v1helper.GetAvoidPodsFromNodeAnnotations(node.Annotations)
if err != nil {
// If we cannot get annotation, assume it's schedulable there.
return framework.NodeScore{Name: node.Name, Score: framework.MaxNodeScore}, nil
}
for i := range avoids.PreferAvoidPods {
avoid := &avoids.PreferAvoidPods[i]
if avoid.PodSignature.PodController.Kind == controllerRef.Kind && avoid.PodSignature.PodController.UID == controllerRef.UID {
return framework.NodeScore{Name: node.Name, Score: 0}, nil
}
}
return framework.NodeScore{Name: node.Name, Score: framework.MaxNodeScore}, nil
}
这里就是比较node的标注中是否和该pod匹配,匹配项包括PodSignature.PodController.Kind 和 PodSignature.PodController.UID,如果匹配,则得分为0,否则得分为100。
根据node亲和度计算得分
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)
Map方法:
func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
// default is the podspec.
affinity := pod.Spec.Affinity
if priorityMeta, ok := meta.(*priorityMetadata); ok {
// We were able to parse metadata, use affinity from there.
affinity = priorityMeta.affinity
}
var count int32
// A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.
// An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an
// empty PreferredSchedulingTerm matches all objects.
if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
if preferredSchedulingTerm.Weight == 0 {
continue
}
// TODO: Avoid computing it for all nodes if this becomes a performance problem.
nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
if err != nil {
return framework.NodeScore{}, err
}
if nodeSelector.Matches(labels.Set(node.Labels)) {
count += preferredSchedulingTerm.Weight
}
}
}
return framework.NodeScore{
Name: node.Name,
Score: int64(count),
}, nil
}
首先一定要有nodeAffinity.preferredDuringSchedulingIgnoredDuringExecution标签,每命中一个preferredSchedulingTerm, 则得分加上该preferredSchedulingTerm的weight。
Reduce则比较简单:
var CalculateNodeAffinityPriorityReduce = NormalizeReduce(framework.MaxNodeScore, false)
func NormalizeReduce(maxPriority int64, reverse bool) PriorityReduceFunction {
return func(
_ *v1.Pod,
_ interface{},
_ schedulerlisters.SharedLister,
result framework.NodeScoreList) error {
var maxCount int64
for i := range result {
if result[i].Score > maxCount {
maxCount = result[i].Score
}
}
if maxCount == 0 {
if reverse {
for i := range result {
result[i].Score = maxPriority
}
}
return nil
}
for i := range result {
score := result[i].Score
score = maxPriority * score / maxCount
if reverse {
score = maxPriority - score
}
result[i].Score = score
}
return nil
}
}
将得分标准化到0-100之间, 主要计算得分公式为:
score = maxPriority * score / maxCount
根据node的taint类型,pod的容忍度的effect:PreferNoSchedule计算得分
注册方法:
scheduler.RegisterPriorityMapReduceFunction(priorities.TaintTolerationPriority, priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1)
Map方法:
func ComputeTaintTolerationPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
// To hold all the tolerations with Effect PreferNoSchedule
var tolerationsPreferNoSchedule []v1.Toleration
if priorityMeta, ok := meta.(*priorityMetadata); ok {
tolerationsPreferNoSchedule = priorityMeta.podTolerations
} else {
tolerationsPreferNoSchedule = getAllTolerationPreferNoSchedule(pod.Spec.Tolerations)
}
return framework.NodeScore{
Name: node.Name,
Score: int64(countIntolerableTaintsPreferNoSchedule(node.Spec.Taints, tolerationsPreferNoSchedule)),
}, nil
}
Map方法中得到的score为高node的taint中PreferNoSchedule 且 该pod不能容忍的个数。那个数越多,得分自然越低,这个步骤在Reduce中:
var ComputeTaintTolerationPriorityReduce = NormalizeReduce(framework.MaxNodeScore, true)
Reduce也是进行了标准化,后面reverse参数则传入了true,Map的得分越高,则最终得分越低。
该算法根据名称很容易判断,当前node本地是否存在pod所需的镜像计算得分。
注册方法:
scheduler.RegisterPriorityMapReduceFunction(priorities.ImageLocalityPriority, priorities.ImageLocalityPriorityMap, nil, 1)
Map方法:
func ImageLocalityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
var score int
if priorityMeta, ok := meta.(*priorityMetadata); ok {
score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes))
} else {
// if we are not able to parse priority meta data, skip this priority
score = 0
}
return framework.NodeScore{
Name: node.Name,
Score: int64(score),
}, nil
}
Map中的score计算方法为:
score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes))
则主要关注calculatePriority和sumImageScores方法。
sumImageScores:
func sumImageScores(nodeInfo *schedulernodeinfo.NodeInfo, containers []v1.Container, totalNumNodes int) int64 {
var sum int64
imageStates := nodeInfo.ImageStates()
for _, container := range containers {
if state, ok := imageStates[normalizedImageName(container.Image)]; ok {
sum += scaledImageScore(state, totalNumNodes)
}
}
return sum
}
func scaledImageScore(imageState *schedulernodeinfo.ImageStateSummary, totalNumNodes int) int64 {
spread := float64(imageState.NumNodes) / float64(totalNumNodes)
return int64(float64(imageState.Size) * spread)
}
sumImageScores是计算当前pod的所有container的镜像是否在该node上,这里是不包括initContainers的,因为initContainers的镜像一般比较小,所以developer这里默认忽略了。那不同镜像怎么计算得分呢?这里根据的是镜像的大小 乘以 镜像的传播度, 镜像越大,传播的越广,则得分越高(主要还是看镜像大小,传播度只是一个scale,引入传播度的概念是防止pod总是被调度到一个node或者少数几个node上,出现 node heating problem)。
calculatePriority:
func calculatePriority(sumScores int64) int {
if sumScores < minThreshold {
sumScores = minThreshold
} else if sumScores > maxThreshold {
sumScores = maxThreshold
}
return int(int64(framework.MaxNodeScore) * (sumScores - minThreshold) / (maxThreshold - minThreshold))
}
calculatePriority做了标准化的操作,另外做了阈值限制,小于23M的和大于1000M的,都直接等于边界值。
node上的资源是否满足pod的limits
func ResourceLimitsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
allocatableResources := nodeInfo.AllocatableResource()
// compute pod limits
var podLimits *schedulernodeinfo.Resource
if priorityMeta, ok := meta.(*priorityMetadata); ok && priorityMeta != nil {
// We were able to parse metadata, use podLimits from there.
podLimits = priorityMeta.podLimits
} else {
// We couldn't parse metadata - fallback to computing it.
podLimits = getResourceLimits(pod)
}
cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU)
memScore := computeScore(podLimits.Memory, allocatableResources.Memory)
score := int64(0)
if cpuScore == 1 || memScore == 1 {
score = 1
}
... ...
return framework.NodeScore{
Name: node.Name,
Score: score,
}, nil
}
func computeScore(limit, allocatable int64) int64 {
if limit != 0 && allocatable != 0 && limit <= allocatable {
return 1
}
return 0
}
如果node的可分配资源(cpu或者memory)大于limit,则score固定为1。
这里有个拓扑传递限制的概念,参考该文章,
该限制可以定义你的pod是按照什么级别(node,可用区,地域)进行扩散,例如你可以指定你的pod在不同地域是均匀分配的。
注册方法:
scheduler.RegisterPriorityMapReduceFunction(
priorities.EvenPodsSpreadPriority,
priorities.CalculateEvenPodsSpreadPriorityMap,
priorities.CalculateEvenPodsSpreadPriorityReduce,
1,
)
Map方法:
func CalculateEvenPodsSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
var m *podTopologySpreadMap
if priorityMeta, ok := meta.(*priorityMetadata); ok {
m = priorityMeta.podTopologySpreadMap
}
if m == nil {
return framework.NodeScore{Name: node.Name, Score: 0}, nil
}
// no need to continue if the node is not qualified.
if _, ok := m.nodeNameSet[node.Name]; !ok {
return framework.NodeScore{Name: node.Name, Score: 0}, nil
}
// For each present <pair>, current node gets a credit of <matchSum>.
// And we sum up <matchSum> and return it as this node's score.
var score int64
for _, c := range m.constraints {
if tpVal, ok := node.Labels[c.topologyKey]; ok {
pair := topologyPair{key: c.topologyKey, value: tpVal}
matchSum := *m.topologyPairToPodCounts[pair]
score += matchSum
}
}
return framework.NodeScore{Name: node.Name, Score: score}, nil
}
Map主要计算的是该node上满足拓扑传递限制的pod的数量。数量越多,则得分越高,但实际上应当保持均衡,所以在Reduce里面进行取反。
Reduce方法:
func CalculateEvenPodsSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister,
result framework.NodeScoreList) error {
var m *podTopologySpreadMap
if priorityMeta, ok := meta.(*priorityMetadata); ok {
m = priorityMeta.podTopologySpreadMap
}
if m == nil {
return nil
}
// Calculate the summed <total> score and <minScore>.
var minScore int64 = math.MaxInt64
var total int64
for _, score := range result {
// it's mandatory to check if <score.Name> is present in m.nodeNameSet
if _, ok := m.nodeNameSet[score.Name]; !ok {
continue
}
total += score.Score
if score.Score < minScore {
minScore = score.Score
}
}
maxMinDiff := total - minScore
for i := range result {
nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
if err != nil {
return err
}
node := nodeInfo.Node()
// Debugging purpose: print the score for each node.
// Score must be a pointer here, otherwise it's always 0.
if klog.V(10) {
defer func(score *int64, nodeName string) {
klog.Infof("%v -> %v: PodTopologySpread NormalizeScore, Score: (%d)", pod.Name, nodeName, *score)
}(&result[i].Score, node.Name)
}
if maxMinDiff == 0 {
result[i].Score = framework.MaxNodeScore
continue
}
if _, ok := m.nodeNameSet[node.Name]; !ok {
result[i].Score = 0
continue
}
flippedScore := total - result[i].Score
fScore := float64(framework.MaxNodeScore) * (float64(flippedScore) / float64(maxMinDiff))
result[i].Score = int64(fScore)
}
return nil
}
Reduce方法则对Map的Score进行标准化,并取反。
优选算法是在预算算法的基础上计算各个node的得分,每种算法计算出加权得分形成为最终的总分。相对于预选算法来说(可以或者不可以调度),则融入了更多计算得分的策略。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。