预选 | 优选 |
---|---|
CheckNodeUnschedulablePred GeneralPred HostNamePred PodFitsHostPortsPred MatchNodeSelectorPred PodFitsResourcesPred NoDiskConflictPred PodToleratesNodeTaintsPred PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred CheckServiceAffinityPred MaxEBSVolumeCountPred MaxGCEPDVolumeCountPred MaxCSIVolumeCountPred MaxAzureDiskVolumeCountPred MaxCinderVolumeCountPred CheckVolumeBindingPred NoVolumeZoneConflictPred EvenPodsSpreadPred MatchInterPodAffinityPred | EqualPriority MostRequestedPriority RequestedToCapacityRatioPriority SelectorSpreadPriority ServiceSpreadingPriority InterPodAffinityPriority LeastRequestedPriority BalancedResourceAllocation NodePreferAvoidPodsPriority NodeAffinityPriority TaintTolerationPriority ImageLocalityPriority ResourceLimitsPriority EvenPodsSpreadPriority |
func main() {
... ...
command := app.NewSchedulerCommand()
... ...
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
入口函数里NewSchdulerCommand, kubernetes所有组件都使用common cli的形式,可参考cobra,NewSchedulerCommand后面会介绍,返回cobra.Command, 然后Execute该command。
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
opts, err := options.NewOptions()
... ...
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `... ... `,
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts, registryOptions...); err != nil {
... ...
}
},
}
fs := cmd.Flags()
namedFlagSets := opts.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
... ...
return cmd
}
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error {
... ...
return Run(ctx, cc, registryOptions...)
}
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {
... ...
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
cc.Recorder,
ctx.Done(),
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithFrameworkPlugins(cc.ComponentConfig.Plugins),
scheduler.WithFrameworkPluginConfig(cc.ComponentConfig.PluginConfig),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
)
... ...
// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {
cc.Broadcaster.StartRecordingToSink(ctx.Done())
}
if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {
cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
}
... ...
// Start all informers.
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())
... ...
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
}
... ...
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
主要有两个公有方法:NewSchedulerCommand和Run, 在入口函数中调用的commond.Execute则会执行runCommandd,继而调用到Run方法。
// New returns a Scheduler
func New(client clientset.Interface,
... ...
options := defaultSchedulerOptions
... ...
configurator := &Configurator{
... ...
}
var sched *Scheduler
source := options.schedulerAlgorithmSource
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.
sc, err := configurator.CreateFromProvider(*source.Provider)
... ...
case source.Policy != nil:
// Create the config from a user specified policy source.
... ...
sc, err := configurator.CreateFromConfig(*policy)
... ...
sched = sc
}
AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
return sched, nil
}
将option转化为Configurator,然后指定调度算法源(预选、优选的算法),通过provider和config的方式。config方式最终会调用CreateFromKeys
,通过指定key选择指定的算法。
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
fwk := sched.Framework
podInfo := sched.NextPod()
... ...
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
if err != nil {
... ...
if sched.DisablePreemption {
... ...
} else {
... ...
sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
}
}
... ...
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
... ...
go func() {
... ...
err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
... ...
}
}
b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
... ...
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod)
... ...
// When only one node after predicate, just use it.
if len(filteredNodes) == 1 {
... ...
return ScheduleResult{
SuggestedHost: filteredNodes[0].Name,
EvaluatedNodes: 1 + len(failedPredicateMap) + len(filteredNodesStatuses),
FeasibleNodes: 1,
}, nil
}
... ...
priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes)
... ...
host, err := g.selectHost(priorityList)
trace.Step("Prioritizing done")
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap) + len(filteredNodesStatuses),
FeasibleNodes: len(filteredNodes),
}, err
}
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
var filtered []*v1.Node
... ...
checkNode := func(i int) {
// We check the nodes starting from where we left off in the previous scheduling cycle,
// this is to make sure all nodes have the same chance of being examined across pods.
nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes]
fits, failedPredicates, status, err := g.podFitsOnNode(
ctx,
state,
pod,
meta,
nodeInfo,
g.alwaysCheckAllPredicates,
)
... ...
if fits {
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = nodeInfo.Node()
}
}
... ...
}
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode)
... ...
if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
... ...
filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)
... ...
}
}
return filtered, failedPredicateMap, filteredNodesStatuses, nil
}
func (g *genericScheduler) prioritizeNodes(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
meta interface{},
nodes []*v1.Node,
) (framework.NodeScoreList, error) {
// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() {
... ...
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
for i := range g.prioritizers {
var err error
results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Name = nodes[index].Name
}
}
})
for i := range g.prioritizers {
if g.prioritizers[i].Reduce == nil {
continue
}
wg.Add(1)
go func(index int) {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()
wg.Done()
}()
if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {
appendError(err)
}
... ...
}(i)
}
... ...
for i := range nodes {
result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
for j := range g.prioritizers {
result[i].Score += results[j][i].Score * g.prioritizers[j].Weight
}
for j := range scoresMap {
result[i].Score += scoresMap[j][i].Score
}
}
if len(g.extenders) != 0 && nodes != nil {
... ...
go func(extIndex int) {
... ...
prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
... ...
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
... ...
combinedScores[host] += score * weight
}
mu.Unlock()
}(i)
}
... ...
return result, nil
}
优选算法的大致流程和预选算法类似,16协程并发计算得分,得分范围为0-10,最终各个算法的得分相加得到总分,不过这里沿用了Map-Reduce的思想,也支持extender计算score(指定权重)
func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil, nil
}
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
if len(g.nodeInfoSnapshot.NodeInfoMap) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoMap, fitError)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
var (
pdbs []*policy.PodDisruptionBudget
err error
)
if g.pdbLister != nil {
pdbs, err = g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
}
nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, potentialNodes, pdbs)
if err != nil {
return nil, nil, nil, err
}
// We will only check nodeToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
if err != nil {
return nil, nil, nil, err
}
candidateNode := pickOneNodeForPreemption(nodeToVictims)
if candidateNode == nil {
return nil, nil, nil, nil
}
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil
}
return nil, nil, nil, fmt.Errorf(
"preemption failed: the target node %s has been deleted from scheduler cache",
candidateNode.Name)
}
scheduler的代码不是很多,流程及思路也比较清晰,但其中有比较多的细节,本文中并没有写出来。另外在1.17.0-rc.1中存在很多framework的代码,支持自定义插件,并在对应的执行流程中得到调用,增加的框架的灵活性。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。