前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Kubernetes源码学习笔记------api-server的请求管理和调度(1)

Kubernetes源码学习笔记------api-server的请求管理和调度(1)

原创
作者头像
北京柴道
发布2024-12-09 16:04:22
发布2024-12-09 16:04:22
700
举报

api-server简介

在Kubernetes中,api-server起到了信息枢纽的关键作用。从kubectl,kube-scheduler,kubelel,etcd,等组件都需要与api-server交互,完成对集群的监控和操作。因此整个Kubernetes系统对于api-server的稳定性要求是极高的。为了防止大量请求让api-server宕机,Kubernetes的创作者采用了非常复杂的请求调度逻辑,让api-server在保证请求请求响应时效性的同时,公平地让每一个请求有被处理的机会,同时通过限流措施也保证api-server不会被大量的请求压垮。接下来就让我们展开Kubernetes的代码,了解api-server是如何进行限流和请求调度的。

api-server的请求管理

api-server基于多个先进先出队列对请求进行管理,采用负载最小-最先到达的原则对请求进行调度。

基本数据结构

api-server采用一个缓冲区数组对请求进行管理。在文件staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go中定义的结构体queueSet是负责管理请求的核心结构:

代码语言:go
复制
type queueSet struct {
    //......

	// queues may be longer than the desired number, while the excess
	// queues are still draining.
	queues []*queue

    //......
}

type queue struct {
	// The requestsWaiting not yet executing in the real world are stored in a FIFO list.
	requestsWaiting fifo

	// nextDispatchR is the R progress meter reading at
	// which the next request will be dispatched in the virtual world.
	nextDispatchR fcrequest.SeatSeconds

	// requestsExecuting is the set of requests executing in the real world.
	requestsExecuting sets.Set[*request]

	// index is the position of this queue among those in its queueSet.
	index int

	// seatsInUse is the total number of "seats" currently occupied
	// by all the requests that are currently executing in this queue.
	seatsInUse int
}

queueSet中的成员变量queues是一个由queue组成的数组,每个queue中装载着外部请求。

queue中采用一个双向链表(fifo)管理请求,这主要是因为新的请求需要从队列后端插入,而在调度阶段,请求需要从队列头部选取。在众多数据结构中,双向链表在索引头尾和尾部插入这两项任务中性能优秀,因此Kubernetes选用了双向链表来存储待调度的请求。

新请求如何被处理

下图展示了新请求如何加入到调度队列中。首先Kubernetes需要计算新请求产生的工作负载(这部分内容会在接下来的文章中详细介绍),接着程序遍历全部的队列,找到目前总负载最小的队列,将新请求加入其中。

在文件staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go中定义的shuffleShardAndRejectOrEnqueueLocked函数实现了这一过程,在shuffleShardLocked阶段,选取负载最小的队列索引,然后在rejectOrEnqueueToBoundLocked中检查负载最小的队列中是否有位置给目前的请求,如果没有,在返回nil,表示这个请求无法被处理,向客户端返回HTTP状态码429,表示服务器目前负载过大。

代码语言:go
复制
func (qs *queueSet) shuffleShardAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
	// Start with the shuffle sharding, to pick a queue.
	queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
	queue := qs.queues[queueIdx]

	defer qs.boundNextDispatchLocked(queue)

	// Create a request and enqueue
	req := &request{
		qs:                qs,
		fsName:            fsName,
		flowDistinguisher: flowDistinguisher,
		ctx:               ctx,
		decision:          qs.promiseFactory(nil, ctx, decisionCancel),
		arrivalTime:       qs.clock.Now(),
		arrivalR:          qs.currentR,
		queue:             queue,
		descr1:            descr1,
		descr2:            descr2,
		queueNoteFn:       queueNoteFn,
		workEstimate:      qs.completeWorkEstimate(workEstimate),
	}
	if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok {
		return nil
	}
	metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requestsWaiting.Length())
	return req
}

请求调度策略

对于请求的调度,Kubernetes的原则是按照请求到达的先后顺序,在保证时效性的同时,兼顾公平。

下图的例子展示了一个调度过程。

  1. 遍历集合中每个队列的头元素(即队列中最早抵达的元素)。
  2. 找到对系统负载最小的那个请求。
  3. 执行这个请求,并释放请求占用的空间。
  4. 重复执行上述过程。

在文件staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go中定义了这个算法的全部细节。

代码语言:go
复制
// findDispatchQueueToBoundLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal, and also returns that request.
// Returns nils if the head of the selected queue can not be dispatched now,
// in which case the caller does not need to follow up with`qs.boundNextDispatchLocked`.
func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
	minVirtualFinish := fqrequest.MaxSeatSeconds
	sMin := fqrequest.MaxSeatSeconds
	dsMin := fqrequest.MaxSeatSeconds
	sMax := fqrequest.MinSeatSeconds
	dsMax := fqrequest.MinSeatSeconds
	var minQueue *queue
	var minIndex int
	nq := len(qs.queues)
	for range qs.queues {
		qs.robinIndex = (qs.robinIndex + 1) % nq
		queue := qs.queues[qs.robinIndex]
		oldestWaiting, _ := queue.requestsWaiting.Peek()
		if oldestWaiting != nil {
			sMin = min(sMin, queue.nextDispatchR)
			sMax = max(sMax, queue.nextDispatchR)
			estimatedWorkInProgress := fqrequest.SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
			dsMin = min(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
			dsMax = max(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
			currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork()
			klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
			if currentVirtualFinish < minVirtualFinish {
				minVirtualFinish = currentVirtualFinish
				minQueue = queue
				minIndex = qs.robinIndex
			}
		}
	}

	oldestReqFromMinQueue, _ := minQueue.requestsWaiting.Peek()
	if oldestReqFromMinQueue == nil {
		// This cannot happen
		klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
		return nil, nil
	}
	if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.MaxSeats()) {
		// since we have not picked the queue with the minimum virtual finish
		// time, we are not going to advance the round robin index here.
		klogV := klog.V(4)
		if klogV.Enabled() {
			klogV.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d",
				qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.MaxSeats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
		}
		metrics.AddDispatchWithNoAccommodation(qs.qCfg.Name, oldestReqFromMinQueue.fsName)
		return nil, nil
	}
	oldestReqFromMinQueue.removeFromQueueLocked()

	// If the requested final seats exceed capacity of that queue,
	// we reduce them to current capacity and adjust additional latency
	// to preserve the total amount of work.
	if oldestReqFromMinQueue.workEstimate.FinalSeats > uint64(qs.dCfg.ConcurrencyLimit) {
		finalSeats := uint64(qs.dCfg.ConcurrencyLimit)
		additionalLatency := oldestReqFromMinQueue.workEstimate.finalWork.DurationPerSeat(float64(finalSeats))
		oldestReqFromMinQueue.workEstimate.FinalSeats = finalSeats
		oldestReqFromMinQueue.workEstimate.AdditionalLatency = additionalLatency
	}

	// we set the round robin indexing to start at the chose queue
	// for the next round.  This way the non-selected queues
	// win in the case that the virtual finish times are the same
	qs.robinIndex = minIndex

	if minQueue.nextDispatchR < oldestReqFromMinQueue.arrivalR {
		klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.nextDispatchR, "request", oldestReqFromMinQueue)
	}
	metrics.SetDispatchMetrics(qs.qCfg.Name, qs.currentR.ToFloat(), minQueue.nextDispatchR.ToFloat(), sMin.ToFloat(), sMax.ToFloat(), dsMin.ToFloat(), dsMax.ToFloat())
	return minQueue, oldestReqFromMinQueue
}

通过这个调度过程,我们可以看出几点特别的考量:

  • api-server采用多队列管理请求,有效防止调度策略退化为:按照请求到达顺序进行顺序执行。有效的保证了相应的时效性(即优先调度负载较小的请求)。
  • api-server在每个队列内部则是按照请求的到达先后顺序进行调度,有效的兼顾了公平性,防止高负载任务始终无法得到调度。

可以看出通过将多个先进先出队列结合,对请求进行调度,既保证了调度的公平性,也保证了调度的时效性,值得我们在项目中借鉴。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • api-server简介
  • api-server的请求管理
    • 基本数据结构
    • 新请求如何被处理
    • 请求调度策略
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档