Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kubernetes 源码剖析之 WorkQueue 队列 | 文末送书

Kubernetes 源码剖析之 WorkQueue 队列 | 文末送书

作者头像
米开朗基杨
发布于 2020-07-09 09:55:35
发布于 2020-07-09 09:55:35
2.8K10
代码可运行
举报
文章被收录于专栏:云原生实验室云原生实验室
运行总次数:0
代码可运行

Docker 技术鼻祖系列

文末直接送 5 本《Kubernetes 源码剖析》。

在 Kubernetes 系统中,组件之间通过 HTTP 协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等。那么 Kubernetes 是如何做到的呢?答案就是 Informer 机制。Kubernetes 的其他组件都是通过 client-go 的 Informer 机制与 Kubernetes API Server 进行通信的。

而 Informer 又需要和 ReflectorDelta FIFO QueueWorkqueue 等协同工作,具体可以参考我之前的一篇文章:从 Kubernetes 资源控制到开放应用模型,控制器的进化之旅

本文主要通过 WorkQueue 的源码来分析其工作原理。

WorkQueue 称为工作队列,Kubernetes 的 WorkQueue 队列与普通 FIFO(先进先出,First-In, First-Out)队列相比,实现略显复杂,它的主要功能在于标记和去重,并支持如下特性。

  • 有序:按照添加顺序处理元素(item)。
  • 去重:相同元素在同一时间不会被重复处理,例如一个元素在处理之前被添加了多次,它只会被处理一次。
  • 并发性:多生产者和多消费者。
  • 标记机制:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队。
  • 通知机制:ShutDown 方法通过信号量通知队列不再接收新的元素,并通知 metric goroutine 退出。
  • 延迟:支持延迟队列,延迟一段时间后再将元素存入队列。
  • 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。
  • Metric:支持 metric 监控指标,可用于 Prometheus 监控。

WorkQueue 支持 3 种队列,并提供了 3 种接口,不同队列实现可应对不同的使用场景,分别介绍如下。

  • Interface:FIFO 队列接口,先进先出队列,并支持去重机制。
  • DelayingInterface:延迟队列接口,基于 Interface 接口封装,延迟一段时间后再将元素存入队列。
  • RateLimitingInterface:限速队列接口,基于 DelayingInterface 接口封装,支持元素存入队列时进行速率限制。

1. FIFO 队列

FIFO 队列支持最基本的队列方法,例如插入元素、获取元素、获取队列长度等。另外,WorkQueue 中的限速及延迟队列都基于 Interface 接口实现,其提供如下方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 代码路径:vendor/k8s.io/client-go/util/workqueue/queue.go

type Interface interface {

    Add(item interface{})

    Len() int

    Get() (item interface{}, shutdown bool)

    Done(item interface{})

    ShutDown()

    ShuttingDown() bool

}

FIFO 队列 Interface 方法说明如下。

  • Add:给队列添加元素(item),可以是任意类型元素。
  • Len:返回当前队列的长度。
  • Get:获取队列头部的一个元素。
  • Done:标记队列中该元素已被处理。
  • ShutDown:关闭队列。
  • ShuttingDown:查询队列是否正在关闭。

FIFO 队列数据结构如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type Type struct {

    queue []t

    dirty set

    processing set

    cond *sync.Cond

    shuttingDown bool

    metrics queueMetrics

    unfinishedWorkUpdatePeriod time.Duration

    clock                      clock.Clock

}

FIFO 队列数据结构中最主要的字段有 queuedirtyprocessing。其中 queue 字段是实际存储元素的地方,它是 slice 结构的,用于保证元素有序;dirty 字段非常关键,除了能保证去重,还能保证在处理一个元素之前哪怕其被添加了多次(并发情况下),但也只会被处理一次;processing 字段用于标记机制,标记一个元素是否正在被处理。应根据 WorkQueue 的特性理解源码的实现,FIFO 存储过程如图 5-9 所示:

图5-9 FIFO存储过程

通过 Add 方法往 FIFO 队列中分别插入 1、2、3 这 3 个元素,此时队列中的 queue 和 dirty 字段分别存有 1、2、3 元素,processing 字段为空。然后通过 Get 方法获取最先进入的元素(也就是 1 元素),此时队列中的 queue 和 dirty 字段分别存有 2、3 元素,而 1 元素会被放入 processing 字段中,表示该元素正在被处理。最后,当我们处理完 1 元素时,通过 Done 方法标记该元素已经被处理完成,此时队列中的 processing 字段中的 1 元素会被删除。

如图 5-9 所示,这是 FIFO 队列的存储流程,在正常的情况下,FIFO 队列运行在并发场景下。高并发下如何保证在处理一个元素之前哪怕其被添加了多次,但也只会被处理一次?下面进行讲解,FIFO 并发存储过程如图 5-10 所示。

图5-10 FIFO并发存储过程

如图 5-10 所示,在并发场景下,假设 goroutine A 通过 Get 方法获取 1 元素,1 元素被添加到 processing 字段中,同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在 processing 字段中已经存在相同的元素,所以后面的 1 元素并不会被直接添加到 queue 字段中,当前 FIFO 队列中的 dirty 字段中存有 1、2、3 元素,processing 字段存有 1 元素。在 goroutine A 通过 Done 方法标记处理完成后,如果 dirty 字段中存有 1 元素,则将 1 元素追加到 queue 字段中的尾部。需要注意的是,dirty 和 processing 字段都是用 Hash Map 数据结构实现的,所以不需要考虑无序,只保证去重即可。

2. 延迟队列

延迟队列,基于 FIFO 队列接口封装,在原有功能上增加了 AddAfter 方法,其原理是延迟一段时间后再将元素插入 FIFO 队列。延迟队列数据结构如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 代码路径:vendor/k8s.io/client-go/util/workqueue/delaying_queue.go

type DelayingInterface interface {
    Interface
    AddAfter(item interface{}, duration time.Duration)
}

type delayingType struct {
    Interface
    clock clock.Clock
    stopCh chan struct{}
    heartbeat clock.Ticker
    waitingForAddCh chan *waitFor
    metrics           retryMetrics
    deprecatedMetrics retryMetrics
}

AddAfter 方法会插入一个 item(元素)参数,并附带一个 duration(延迟时间)参数,该 duration 参数用于指定元素延迟插入 FIFO 队列的时间。如果 duration 小于或等于 0,会直接将元素插入 FIFO 队列中。

delayingType 结构中最主要的字段是 waitingForAddCh,其默认初始大小为 1000,通过 AddAfter 方法插入元素时,是非阻塞状态的,只有当插入的元素大于或等于 1000 时,延迟队列才会处于阻塞状态。waitingForAddCh 字段中的数据通过 goroutine 运行的 waitingLoop 函数持久运行。延迟队列运行原理如图 5-11 所示。

图5-11 延迟队列运行原理

如图 5-11 所示,将元素 1 放入 waitingForAddCh 字段中,通过 waitingLoop 函数消费元素数据。当元素的延迟时间不大于当前时间时,说明还需要延迟将元素插入 FIFO 队列的时间,此时将该元素放入优先队列(waitForPriorityQueue)中。当元素的延迟时间大于当前时间时,则将该元素插入 FIFO 队列中。另外,还会遍历优先队列(waitForPriorityQueue)中的元素,按照上述逻辑验证时间。

3. 限速队列

限速队列,基于延迟队列和 FIFO 队列接口封装,限速队列接口(RateLimitingInterface)在原有功能上增加了 AddRateLimitedForgetNumRequeues 方法。限速队列的重点不在于 RateLimitingInterface 接口,而在于它提供的 4 种限速算法接口(RateLimiter)。其原理是,限速队列利用延迟队列的特性,延迟某个元素的插入时间,达到限速目的。RateLimiter 数据结构如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 代码路径:vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go

type RateLimiter interface {
    When(item interface{}) time.Duration
    Forget(item interface{})
    NumRequeues(item interface{}) int
}

限速队列接口方法说明如下。

  • When:获取指定元素应该等待的时间。
  • Forget:释放指定元素,清空该元素的排队数。
  • NumRequeues:获取指定元素的排队数。

注意:这里有一个非常重要的概念——限速周期。限速周期是指从执行 AddRateLimited 方法到执行完 Forget 方法之前的时间。如果该元素被 Forget 方法处理完,从清空队列数。

下面会分别详解 WorkQueue 提供的 4 种限速算法,应对不同的场景,这 4 种限速算法分别如下。

  • 令牌桶算法(BucketRateLimiter)。
  • 排队指数算法(ItemExponentialFailureRateLimiter)。
  • 计数器算法(ItemFastSlowRateLimiter)。
  • 混合模式(MaxOfRateLimiter),将多种限速算法混合使用。

令牌桶算法

令牌桶算法是通过 Go 语言的第三方库 golang.org/x/time/rate 实现的。令牌桶算法内部实现了一个存放 token(令牌)的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过(accept),而没有得到 token 的元素处于等待状态。令牌桶算法通过控制发放 token 来达到限速目的。令牌桶算法原理如图 5-12 所示。

图5-12 令牌桶算法原理

WorkQueue 在默认的情况下会实例化令牌桶,代码示例如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
rate.NewLimiter(rate.Limit(10), 100)

在实例化 rate.NewLimiter 后,传入 rb 两个参数,其中 r 参数表示每秒往“桶”里填充的 token 数量,b 参数表示令牌桶的大小(即令牌桶最多存放的 token 数量)。我们假定 r 为 10,b 为 100。假设在一个限速周期内插入了 1000 个元素,通过 r.Limiter.Reserve().Delay 函数返回指定元素应该等待的时间,那么前 b(即 100)个元素会被立刻处理,而后面元素的延迟时间分别为 item100/100msitem101/200msitem102/300msitem103/400ms,以此类推。

排队指数算法

排队指数算法将相同元素的排队数作为指数,排队数增大,速率限制呈指数级增长,但其最大值不会超过 maxDelay。元素的排队数统计是有限速周期的,一个限速周期是指从执行 AddRateLimited 方法到执行完 Forget 方法之间的时间。如果该元素被 Forget 方法处理完,则清空排队数。排队指数算法的核心实现代码示例如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 代码路径:vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go

r.failures[item] = r.failures[item] + 1
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
    return r.maxDelay
}

该算法提供了 3 个主要字段:failuresbaseDelaymaxDelay。其中,failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;另外,baseDelay 字段是最初的限速单位(默认为 5ms),maxDelay 字段是最大限速单位(默认为 1000s)。排队指数增长趋势如图 5-13 所示。

图5-13 排队指数增长趋势

限速队列利用延迟队列的特性,延迟多个相同元素的插入时间,达到限速目的。

注意:在同一限速周期内,如果不存在相同元素,那么所有元素的延迟时间为 baseDelay;而在同一限速周期内,如果存在相同元素,那么相同元素的延迟时间呈指数级增长,最长延迟时间不超过 baseDelay

们假定 baseDelay 是 1 * time.Millisecond,maxDelay 是 1000 * time.Second。假设在一个限速周期内通过 AddRateLimited 方法插入 10 个相同元素,那么第 1 个元素会通过延迟队列的 AddAfter 方法插入并设置延迟时间为 1ms(即 baseDelay),第 2 个相同元素的延迟时间为 2ms,第 3 个相同元素的延迟时间为 4ms,第 4 个相同元素的延迟时间为 8ms,第 5 个相同元素的延迟时间为 16ms……第 10 个相同元素的延迟时间为 512ms,最长延迟时间不超过 1000s(即 maxDelay)。

计数器算法

计数器算法是限速算法中最简单的一种,其原理是:限制一段时间内允许通过的元素数量,例如在 1 分钟内只允许通过 100 个元素,每插入一个元素,计数器自增 1,当计数器数到 100 的阈值且还在限速周期内时,则不允许元素再通过。但 WorkQueue 在此基础上扩展了 fast 和 slow 速率。

计数器算法提供了 4 个主要字段:failuresfastDelayslowDelaymaxFastAttempts。其中,failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;而 fastDelayslowDelay 字段是用于定义 fast、slow 速率的;另外,maxFastAttempts 字段用于控制从 fast 速率转换到 slow 速率。计数器算法核心实现的代码示例如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
r.failures[item] = r.failures[item] + 1
if r.failures[item] <= r.maxFastAttempts {
    return r.fastDelay
}
return r.slowDelay

假设 fastDelay 是 5 * time.Millisecond,slowDelay 是 10 * time.SecondmaxFastAttempts 是 3。在一个限速周期内通过 AddRateLimited 方法插入 4 个相同的元素,那么前 3 个元素使用 fastDelay 定义的 fast 速率,当触发 maxFastAttempts 字段时,第 4 个元素使用 slowDelay 定义的 slow 速率。

混合模式

混合模式是将多种限速算法混合使用,即多种限速算法同时生效。例如,同时使用排队指数算法和令牌桶算法,代码示例如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func DefaultControllerRateLimiter() RateLimiter {
    return NewMaxOfRateLimiter(
        NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
        &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
    )
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-07-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 云原生实验室 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
1 条评论
热度
最新
源码中关于NumRequeues注释的: // NumRequeues returns back how many failures the item has had NumRequeues(item interface{}) int 说明NumRequeues的意思是返回item处理的失败次数。 而原文中NumRequeues的注释"获取指定元素的排队数",是不是有点太离谱了呢?
源码中关于NumRequeues注释的: // NumRequeues returns back how many failures the item has had NumRequeues(item interface{}) int 说明NumRequeues的意思是返回item处理的失败次数。 而原文中NumRequeues的注释"获取指定元素的排队数",是不是有点太离谱了呢?
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
k8s-client-go源码剖析(三)
本文主要讲述下client-go中workqueue, 看一下client-go的一个整体数据走向.如下图:
用户2672162
2021/02/03
7130
Kubernetes 源码学习之限速队列
前面我们详细分析了 client-go 中的延迟队列的实现,接下来就是限速队列的实现,限速队列在我们日常应用中非常广泛,其原理也比较简单,利用延迟队列的特性,延迟某个元素的插入时间来达到限速的目的。
我是阳明
2020/09/30
3.4K0
Kubernetes 源码学习之限速队列
client-go 源码分析(9) - workerqueue之限速队列RateLimitingQueue
workerqueue的限速队列RateLimitingQueue搞明白三件事就可以了。
后端云
2023/02/10
8070
client-go 源码分析(9) - workerqueue之限速队列RateLimitingQueue
k8s client-go 的 workqueue 源码分析
本文阅读代码链接:https://github.com/kubernetes/client-go/tree/release-1.30
rxg456
2025/03/18
580
k8s client-go 的 workqueue 源码分析
kubernetes client-go解析
Indexer保存了来自apiServer的资源。使用listWatch方式来维护资源的增量变化。通过这种方式可以减小对apiServer的访问,减轻apiServer端的压力
charlieroro
2020/03/24
1.3K1
kubernetes client-go解析
【笔记】Operator课程(7-9)
Indexer缓存k8s资源对象,并提供便捷的方式查询。例如获取某个namespace下的所有资源
Yuyy
2023/04/12
2110
【笔记】Operator课程(7-9)
Kubernetes 源码学习之延时队列
client-go 中的 workqueue,类似于 golang 语言中的 channel,主要用于并发程序之间的数据同步。Kubernetes 的控制器模型通过 client-go 的 informer watch 资源变化,当资源发生变化时会通过回调函数将资源写入队列,由控制器中的消费者完成业务处理。
我是阳明
2020/09/22
1.4K0
Kubernetes 源码学习之延时队列
client-go 源码分析(7) - workerqueue之普通队列Queue
client-go 的 util/workqueue 包里主要有三个队列,分别是普通队列Queue,延时队列DelayingQueue,限速队列RateLimitingQueue,后一个队列以前一个队列的实现为基础,层层添加新功能。
后端云
2023/02/10
2880
client-go 源码分析(7) - workerqueue之普通队列Queue
client-go 源码分析(8) - workerqueue之延时队列DelayingQueue
延时队列DelayingQueue,从下面的接口可以看出添加的元素,有一个延迟时间,延时时间到了之后才能加入队列。
后端云
2023/02/10
5200
client-go 源码分析(8) - workerqueue之延时队列DelayingQueue
高并发场景,nginx怎么限速
我们会通过一些简单的示例展示Nginx限速限流模块是如何工作的,然后结合代码讲解其背后的算法和原理。
后端技术探索
2018/12/05
1.9K0
高并发场景,nginx怎么限速
除了MySQL,大牛DBA还会啥?
写在前面:想要流畅阅读本文,需要读者——对K8s的架构有简单了解,理解API Server扮演的角色;具有阅读简单golang源码的能力,包括函数/类方法定义、变量声明等。 如何理解Controller 先引用一段官方的解释: 当你设置了温度,告诉了温度自动调节器你的期望状态(Desired State)。房间的实际温度是当前状态(Current State)。通过对设备的开关控制,温度自动调节器让其当前状态接近期望状态。 控制器通过 apiserver 监控集群的公共状态,并致力于将当前状态转变为期望
腾讯云数据库 TencentDB
2020/10/14
7590
H3C Qos
配置802.1p优先级到本地优先级映射表,将802.1p优先级3、4、5对应的本地优先级配置为2、6、4。保证访问服务器的优先级为研发部门(6)>管理部门(4)>市场部门(2)。
py3study
2020/01/13
1K0
QoS 工作原理与相关技术细节
VOIP:Voice Over Internet Protocol 模拟声音讯号数字化
利又德智能感知
2022/11/07
9141
Kubernetes Controller 机制详解
Kubernetes API List/Watch 机制 与 Informer 客户端库
SRE运维进阶之路
2024/04/23
3370
Kubernetes Controller 机制详解
浅析 Kubernetes 控制器的工作原理
Kubernetes 中运行了一系列控制器来确保集群的当前状态与期望状态保持一致,它们就是 Kubernetes 的大脑。例如,ReplicaSet 控制器负责维护集群中运行的 Pod 数量;Node 控制器负责监控节点的状态,并在节点出现故障时及时做出响应。总而言之,在 Kubernetes 中,每个控制器只负责某种类型的特定资源。对于集群管理员来说,了解每个控制器的角色分工至关重要,如有必要,你还需要深入了解控制器的工作原理。
米开朗基杨
2019/08/29
9.2K0
浅析 Kubernetes 控制器的工作原理
client-go实战之九:手写一个kubernetes的controller
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 系列文章链接 client-go实战之一:准备工作 client-go实战之二:RESTClient client-go实战之三:Clientset client-go实战之四:dynamicClient client-go实战之五:DiscoveryClient client-go实战之六:时隔两年,刷新版本继续实战 client-go实战之七:准备一个工
程序员欣宸
2023/02/13
1.6K0
client-go实战之九:手写一个kubernetes的controller
client-go 源码分析(10) - 使用client-go实现一个简单controller的例子
下面的example也是client-go官方的例子。通过这个简单的例子正好把之前的源码分析的一个个模块都串起来了。
后端云
2023/02/10
7990
client-go 源码分析(10) - 使用client-go实现一个简单controller的例子
[从源码学设计]蚂蚁金服SOFARegistry 之 自动调节间隔周期性任务
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
罗西的思考
2020/12/22
5770
【云顾问-混沌】Linux的网络管理神器-tc qdisc
在介绍tc qdisc之前,先解释下tc是什么, tc(traffic control)是Linux内核中的一个网络流量控制工具,它可以用来控制网络流量的带宽、延迟、丢包等参数,从而实现网络流量的优化和管理。详细介绍可以参考Linux TC工具的官方文档和man手册。而qdisc (queueing disciplines), 是tc工具中的一部分,叫做队列规则,是一种可以定义Linux网络流量队列规则的一种机制,可以进行流量排队、调度以及限速等操作,达到对网络流量的精细控制和管理。如下是几个qdisc的例子:
冷淡然
2023/06/27
4.4K0
【云顾问-混沌】Linux的网络管理神器-tc qdisc
【深入浅出】Kubernetes控制器:云原生架构的无形守护者
在云原生的海洋中,Kubernetes如同一艘航母,它的控制器系统则是维持应用稳定运行的“自动驾驶仪”。今天,让我们一起研究下控制器,深入理解它如何精确地管理我们的容器应用。
希里安
2024/01/30
1960
【深入浅出】Kubernetes控制器:云原生架构的无形守护者
相关推荐
k8s-client-go源码剖析(三)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验