type queueSet struct {
// queues may be longer than the desired number, while the excess
// queues are still draining.
queues []*queue
type queue struct {
// The requestsWaiting not yet executing in the real world are stored in a FIFO list.
requestsWaiting fifo
// nextDispatchR is the R progress meter reading at
// which the next request will be dispatched in the virtual world.
nextDispatchR fcrequest.SeatSeconds
// requestsExecuting is the set of requests executing in the real world.
requestsExecuting sets.Set[*request]
// index is the position of this queue among those in its queueSet.
index int
// seatsInUse is the total number of "seats" currently occupied
// by all the requests that are currently executing in this queue.
seatsInUse int
func (qs *queueSet) shuffleShardAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
defer qs.boundNextDispatchLocked(queue)
// Create a request and enqueue
req := &request{
qs: qs,
fsName: fsName,
flowDistinguisher: flowDistinguisher,
ctx: ctx,
decision: qs.promiseFactory(nil, ctx, decisionCancel),
arrivalTime: qs.clock.Now(),
arrivalR: qs.currentR,
queue: queue,
descr1: descr1,
descr2: descr2,
queueNoteFn: queueNoteFn,
workEstimate: qs.completeWorkEstimate(workEstimate),
if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok {
return nil
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requestsWaiting.Length())
return req
// findDispatchQueueToBoundLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal, and also returns that request.
// Returns nils if the head of the selected queue can not be dispatched now,
// in which case the caller does not need to follow up with`qs.boundNextDispatchLocked`.
func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
minVirtualFinish := fqrequest.MaxSeatSeconds
sMin := fqrequest.MaxSeatSeconds
dsMin := fqrequest.MaxSeatSeconds
sMax := fqrequest.MinSeatSeconds
dsMax := fqrequest.MinSeatSeconds
var minQueue *queue
var minIndex int
nq := len(qs.queues)
for range qs.queues {
qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex]
oldestWaiting, _ := queue.requestsWaiting.Peek()
if oldestWaiting != nil {
sMin = min(sMin, queue.nextDispatchR)
sMax = max(sMax, queue.nextDispatchR)
estimatedWorkInProgress := fqrequest.SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
dsMin = min(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
dsMax = max(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork()
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish
minQueue = queue
minIndex = qs.robinIndex
oldestReqFromMinQueue, _ := minQueue.requestsWaiting.Peek()
if oldestReqFromMinQueue == nil {
// This cannot happen
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
return nil, nil
if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.MaxSeats()) {
// since we have not picked the queue with the minimum virtual finish
// time, we are not going to advance the round robin index here.
klogV := klog.V(4)
if klogV.Enabled() {
klogV.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d",
qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.MaxSeats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
metrics.AddDispatchWithNoAccommodation(qs.qCfg.Name, oldestReqFromMinQueue.fsName)
return nil, nil
// If the requested final seats exceed capacity of that queue,
// we reduce them to current capacity and adjust additional latency
// to preserve the total amount of work.
if oldestReqFromMinQueue.workEstimate.FinalSeats > uint64(qs.dCfg.ConcurrencyLimit) {
finalSeats := uint64(qs.dCfg.ConcurrencyLimit)
additionalLatency := oldestReqFromMinQueue.workEstimate.finalWork.DurationPerSeat(float64(finalSeats))
oldestReqFromMinQueue.workEstimate.FinalSeats = finalSeats
oldestReqFromMinQueue.workEstimate.AdditionalLatency = additionalLatency
// we set the round robin indexing to start at the chose queue
// for the next round. This way the non-selected queues
// win in the case that the virtual finish times are the same
qs.robinIndex = minIndex
if minQueue.nextDispatchR < oldestReqFromMinQueue.arrivalR {
klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.nextDispatchR, "request", oldestReqFromMinQueue)
metrics.SetDispatchMetrics(qs.qCfg.Name, qs.currentR.ToFloat(), minQueue.nextDispatchR.ToFloat(), sMin.ToFloat(), sMax.ToFloat(), dsMin.ToFloat(), dsMax.ToFloat())
return minQueue, oldestReqFromMinQueue
如有侵权,请联系 cloudcommunity@tencent.com 删除。
如有侵权,请联系 cloudcommunity@tencent.com 删除。