本文阅读代码链接:https://github.com/kubernetes/client-go/tree/release-1.30
下面代码全部在 tools/cache/delta_fifo.go 文件中:
DeltaFIFO 是 client-go 中的核心队列组件,作为 Reflector 和后续消费者之间的桥梁,负责存储资源对象的变更事件并确保这些事件按顺序被处理。其名称包含两个重要概念:
DeltaFIFO 解决以下用例:
type DeltaFIFO struct {
// 锁和条件变量
lock sync.RWMutex
cond sync.Cond
// 存储数据的主要字段
items map[string]Deltas // 键到 Deltas 列表的映射
queue []string // 保持 FIFO 顺序的键列表
// 同步状态追踪
populated bool
initialPopulationCount int
// 其他状态字段
keyFunc KeyFunc // 从对象获取键的函数
knownObjects KeyListerGetter // 已知对象存储,用于对比
closed bool
emitDeltaTypeReplaced bool
transformer TransformFunc
}
// Delta 表示一个变化事件,包含变化类型和对象本身
type Delta struct {
Type DeltaType // 变化类型
Object interface{} // 变化后的对象(删除事件则是删除前的最终状态)
}
// Deltas 是一个对象的多个 Delta 的列表,按时间顺序排列
type Deltas []Delta
type DeltaType string
const (
Added DeltaType = "Added" // 新增
Updated DeltaType = "Updated" // 更新
Deleted DeltaType = "Deleted" // 删除
Replaced DeltaType = "Replaced" // 替换(由于 watch 错误导致重新列举时)
Sync DeltaType = "Sync" // 同步(定期重新同步时)
)
所有事件入队操作最终都调用 queueActionLocked 方法:
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 获取对象键
// 重要性:整个队列围绕对象键构建,而非对象本身
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 转换对象(如果设置了transformer)
// 灵活性:允许在入队前修改或过滤对象
if f.transformer != nil {
obj, err = f.transformer(obj)
if err != nil {
return err
}
}
// 核心设计:每个键对应一个 Deltas 列表,记录所有历史变更
// 获取已有的事件列表
oldDeltas := f.items[id]
// 添加新事件
newDeltas := append(oldDeltas, Delta{actionType, obj})
// 事件去重
newDeltas = dedupDeltas(newDeltas)
// 将新的事件列表存储并通知消费者
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
// 如果是新对象,将其ID加入队列
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast() // 通知等待的消费者
} else {
// 异常状态处理,理论上不可能的路径
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}
队列维护与通知:
items:键到 Deltas 列表的映射(内容存储)
queue:保持 FIFO 顺序的键列表(顺序保证)
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
DeltaFIFO 提供了多种方式将事件入队:
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
// 特殊逻辑:如果对象已不存在,则跳过删除
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
return nil // 对象已不存在,无需再次报告删除
}
} else {
// 更复杂的检查逻辑
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
return nil // 对象在索引和队列中都不存在,跳过
}
}
return f.queueActionLocked(Deleted, obj)
}
使用场景:
// ListAndWatch 内部调用
err = r.list(stopCh) // List 操作获取全量数据
// ...在 list 方法中会调用
return r.store.Replace(found, resourceVersion) // 其中 r.store 就是 DeltaFIFO
// ...在 watchList 方法中也会调用
err = r.store.Replace(temporaryStore.List(), resourceVersion)
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 决定使用哪种事件类型
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}
// 将新列表中的所有对象加入队列
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 标记未在新列表中的对象为删除
queuedDeletions := 0
// 1. 检查队列中的对象
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
// 找出最新状态用于生成删除事件
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
// 处理特殊的删除状态对象
if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
deletedObj = d.Obj
}
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 2. 检查已知对象存储中的对象
if f.knownObjects != nil {
// ... 略
}
// 初始化填充计数,用于判断同步状态
if !f.populated {
f.populated = true
f.initialPopulationCount = keys.Len() + queuedDeletions
}
return nil
}
Replace 方法做三件重要的事:
最常见的使用场景是 Informer 中的定期重新同步机制
Resync 用于解决以下情况:
// Resync 将 f.knownObjects 中列出的每个对象都添加到队列中,添加时使用 Sync 类型的 Delta
// 但只处理那些尚未在处理队列中的对象键
// 如果 f.knownObjects 为 nil,则 Resync 不做任何操作
func (f *DeltaFIFO) Resync() error {
// 获取锁以保护对 DeltaFIFO 内部结构的访问
f.lock.Lock()
defer f.lock.Unlock()
// 如果没有配置 knownObjects 存储,则无需执行同步
if f.knownObjects == nil {
return nil
}
// 获取已知对象存储中的所有键
keys := f.knownObjects.ListKeys()
// 遍历每个键,将其加入处理队列
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
Resync 的具体工作流程:
// syncKeyLocked 尝试将指定键对应的对象添加到处理队列
// 该方法假设调用者已经持有锁
func (f *DeltaFIFO) syncKeyLocked(key string) error {
// 从已知对象存储中获取对象
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
// 如果获取对象时发生错误,记录日志但不中断整个同步过程
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
// 如果对象不存在于已知对象存储中,记录信息并跳过
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}
// 重要优化:如果我们正在执行 Resync(),且该对象已经有事件在队列中等待处理,
// 则忽略该对象的 Resync。这是为了避免竞争条件:重新同步可能使用对象的旧值,
// 因为将事件加入队列不会触发底层存储 <knownObjects> 的更新。
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 检查对象是否已在队列中
if len(f.items[id]) > 0 {
return nil
}
// 将对象以 Sync 类型添加到队列
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
每当新的事件被添加时,DeltaFIFO 会尝试去重:
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
a := &deltas[n-1] // 最新事件
b := &deltas[n-2] // 次新事件
if out := isDup(a, b); out != nil {
deltas[n-2] = *out // 合并重复事件
return deltas[:n-1] // 返回缩减后的列表
}
return deltas
}
func isDup(a, b *Delta) *Delta {
// 目前只处理删除事件的去重
if out := isDeletionDup(a, b); out != nil {
return out
}
return nil
}
func isDeletionDup(a, b *Delta) *Delta {
if b.Type != Deleted || a.Type != Deleted {
return nil // 只有两个都是删除事件才考虑去重
}
// 保留包含更多信息的删除事件
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a // 保留普通删除事件
}
return b // 保留 DeletedFinalStateUnknown 事件
}
关键设计思想
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 阻塞等待机制
// 等待队列有数据
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait() // 阻塞等待生产者唤醒
}
// 判断是否在初始同步列表中
isInInitialList := !f.hasSynced_locked()
// 从队列头取出一个ID
// 原子操作: 获取项目并立即从队列和存储中删除
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
// 更新初始填充计数
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// 获取对应的事件列表
item, ok := f.items[id]
if !ok {
// 异常情况
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
// 从存储中删除该项
delete(f.items, id)
// 性能追踪
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
// 处理事件
err := process(item, isInInitialList) // 回调设计: 通过函数参数注入处理逻辑
if e, ok := err.(ErrRequeue); ok {
// 处理器请求重新入队
f.addIfNotPresent(id, item) // 重新入队机制: 支持处理失败时重新入队
err = e.Err
}
return item, err
}
}
Pop实际应用场景:
在 Controller 模式中:
for {
// 从 DeltaFIFO 获取下一个要处理的项目
obj, err := queue.Pop(func(obj interface{}, isInInitialList bool) error {
// 处理从队列中弹出的项目
deltas := obj.(cache.Deltas)
// ... 处理逻辑 ...
return nil
})
// ... 错误处理 ...
}
Pop 方法是 Kubernetes 控制器模式中的核心环节,它确保了资源变更能够可靠、有序地被消费和处理,是整个事件驱动系统的关键组件。
func (f *DeltaFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
}
hasSynced_locked() 用于判断 DeltaFIFO 队列是否已经完成初始同步,返回 true 表示"已完成初始同步"。要满足"已同步"需要同时满足两个条件:
1.populated:
2.initialPopulationCount:
这个方法的主要作用是告诉消费者(通常是 Informer 或控制器):
当 Kubernetes 控制器启动时:
比如在 SharedInformer 中:
func (s *sharedIndexInformer) HasSynced() bool {
s.controller.Lock()
defer s.controller.Unlock()
return s.controller.hasSynced()
}
func (c *controller) hasSynced() bool {
return c.config.Queue.HasSynced() // 这里会调用 DeltaFIFO 的 HasSynced 方法
}
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.hasSynced_locked() // 调用了 hasSynced_locked 方法
}
控制器会等待 HasSynced() 返回 true 后才开始正常工作循环,这确保了控制器决策基于完整的数据。
简单来说,这个方法就是一个"初始化完成"的标志,对于保证 Kubernetes 控制器的正确性至关重要。
DeltaFIFO 使用 sync.Cond
实现生产者-消费者模式:
Add/Update/Delete/Replace
等方法向队列添加事件,然后调用 f.cond.Broadcast()
通知消费者Pop
方法,如果队列为空则 f.cond.Wait()
阻塞等待,直到有新事件到来1.Reflector 作为生产者,将 API Server 的变更事件添加到 DeltaFIFO
2.Informer 内部的处理器作为消费者,调用 Pop 方法处理事件
3.完整链路
API Server --(List/Watch)--> Reflector --(Add/Update/Delete)--> DeltaFIFO --(Pop)--> Controller --(Process)--> 业务逻辑
DeltaFIFO 通过 FIFO 队列和事件变更记录(Delta)机制,实现了对 Kubernetes 资源对象变更的有序处理。关键特点包括:
DeltaFIFO 的设计体现了 Kubernetes 对事件处理的严谨性,确保了集群状态变更能够可靠地传递给各个控制器。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。