在Kubernetes中,api-server起到了信息枢纽的关键作用。从kubectl,kube-scheduler,kubelel,etcd,等组件都需要与api-server交互,完成对集群的监控和操作。因此整个Kubernetes系统对于api-server的稳定性要求是极高的。为了防止大量请求让api-server宕机,Kubernetes的创作者采用了非常复杂的请求调度逻辑,让api-server在保证请求请求响应时效性的同时,公平地让每一个请求有被处理的机会,同时通过限流措施也保证api-server不会被大量的请求压垮。接下来就让我们展开Kubernetes的代码,了解api-server是如何进行限流和请求调度的。
api-server基于多个先进先出队列对请求进行管理,采用负载最小-最先到达的原则对请求进行调度。
api-server采用一个缓冲区数组对请求进行管理。在文件staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go
中定义的结构体queueSet
是负责管理请求的核心结构:
type queueSet struct {
//......
// queues may be longer than the desired number, while the excess
// queues are still draining.
queues []*queue
//......
}
type queue struct {
// The requestsWaiting not yet executing in the real world are stored in a FIFO list.
requestsWaiting fifo
// nextDispatchR is the R progress meter reading at
// which the next request will be dispatched in the virtual world.
nextDispatchR fcrequest.SeatSeconds
// requestsExecuting is the set of requests executing in the real world.
requestsExecuting sets.Set[*request]
// index is the position of this queue among those in its queueSet.
index int
// seatsInUse is the total number of "seats" currently occupied
// by all the requests that are currently executing in this queue.
seatsInUse int
}
queueSet
中的成员变量queues
是一个由queue组成的数组,每个queue中装载着外部请求。
queue
中采用一个双向链表(fifo
)管理请求,这主要是因为新的请求需要从队列后端插入,而在调度阶段,请求需要从队列头部选取。在众多数据结构中,双向链表在索引头尾和尾部插入这两项任务中性能优秀,因此Kubernetes选用了双向链表来存储待调度的请求。
下图展示了新请求如何加入到调度队列中。首先Kubernetes需要计算新请求产生的工作负载(这部分内容会在接下来的文章中详细介绍),接着程序遍历全部的队列,找到目前总负载最小的队列,将新请求加入其中。
在文件staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go
中定义的shuffleShardAndRejectOrEnqueueLocked
函数实现了这一过程,在shuffleShardLocked
阶段,选取负载最小的队列索引,然后在rejectOrEnqueueToBoundLocked
中检查负载最小的队列中是否有位置给目前的请求,如果没有,在返回nil,表示这个请求无法被处理,向客户端返回HTTP状态码429,表示服务器目前负载过大。
func (qs *queueSet) shuffleShardAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
defer qs.boundNextDispatchLocked(queue)
// Create a request and enqueue
req := &request{
qs: qs,
fsName: fsName,
flowDistinguisher: flowDistinguisher,
ctx: ctx,
decision: qs.promiseFactory(nil, ctx, decisionCancel),
arrivalTime: qs.clock.Now(),
arrivalR: qs.currentR,
queue: queue,
descr1: descr1,
descr2: descr2,
queueNoteFn: queueNoteFn,
workEstimate: qs.completeWorkEstimate(workEstimate),
}
if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok {
return nil
}
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requestsWaiting.Length())
return req
}
对于请求的调度,Kubernetes的原则是按照请求到达的先后顺序,在保证时效性的同时,兼顾公平。
下图的例子展示了一个调度过程。
在文件staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go
中定义了这个算法的全部细节。
// findDispatchQueueToBoundLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal, and also returns that request.
// Returns nils if the head of the selected queue can not be dispatched now,
// in which case the caller does not need to follow up with`qs.boundNextDispatchLocked`.
func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
minVirtualFinish := fqrequest.MaxSeatSeconds
sMin := fqrequest.MaxSeatSeconds
dsMin := fqrequest.MaxSeatSeconds
sMax := fqrequest.MinSeatSeconds
dsMax := fqrequest.MinSeatSeconds
var minQueue *queue
var minIndex int
nq := len(qs.queues)
for range qs.queues {
qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex]
oldestWaiting, _ := queue.requestsWaiting.Peek()
if oldestWaiting != nil {
sMin = min(sMin, queue.nextDispatchR)
sMax = max(sMax, queue.nextDispatchR)
estimatedWorkInProgress := fqrequest.SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
dsMin = min(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
dsMax = max(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork()
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish
minQueue = queue
minIndex = qs.robinIndex
}
}
}
oldestReqFromMinQueue, _ := minQueue.requestsWaiting.Peek()
if oldestReqFromMinQueue == nil {
// This cannot happen
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
return nil, nil
}
if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.MaxSeats()) {
// since we have not picked the queue with the minimum virtual finish
// time, we are not going to advance the round robin index here.
klogV := klog.V(4)
if klogV.Enabled() {
klogV.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d",
qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.MaxSeats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
}
metrics.AddDispatchWithNoAccommodation(qs.qCfg.Name, oldestReqFromMinQueue.fsName)
return nil, nil
}
oldestReqFromMinQueue.removeFromQueueLocked()
// If the requested final seats exceed capacity of that queue,
// we reduce them to current capacity and adjust additional latency
// to preserve the total amount of work.
if oldestReqFromMinQueue.workEstimate.FinalSeats > uint64(qs.dCfg.ConcurrencyLimit) {
finalSeats := uint64(qs.dCfg.ConcurrencyLimit)
additionalLatency := oldestReqFromMinQueue.workEstimate.finalWork.DurationPerSeat(float64(finalSeats))
oldestReqFromMinQueue.workEstimate.FinalSeats = finalSeats
oldestReqFromMinQueue.workEstimate.AdditionalLatency = additionalLatency
}
// we set the round robin indexing to start at the chose queue
// for the next round. This way the non-selected queues
// win in the case that the virtual finish times are the same
qs.robinIndex = minIndex
if minQueue.nextDispatchR < oldestReqFromMinQueue.arrivalR {
klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.nextDispatchR, "request", oldestReqFromMinQueue)
}
metrics.SetDispatchMetrics(qs.qCfg.Name, qs.currentR.ToFloat(), minQueue.nextDispatchR.ToFloat(), sMin.ToFloat(), sMax.ToFloat(), dsMin.ToFloat(), dsMax.ToFloat())
return minQueue, oldestReqFromMinQueue
}
通过这个调度过程,我们可以看出几点特别的考量:
可以看出通过将多个先进先出队列结合,对请求进行调度,既保证了调度的公平性,也保证了调度的时效性,值得我们在项目中借鉴。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。