首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >k8s client-go 的 DeltaFIFO 源码分析

k8s client-go 的 DeltaFIFO 源码分析

原创
作者头像
rxg456
发布2025-03-11 21:35:55
发布2025-03-11 21:35:55
1790
举报

本文阅读代码链接:https://github.com/kubernetes/client-go/tree/release-1.30

下面代码全部在 tools/cache/delta_fifo.go 文件中:

一. DeltaFIFO 的基本概念

DeltaFIFO 是 client-go 中的核心队列组件,作为 Reflector 和后续消费者之间的桥梁,负责存储资源对象的变更事件并确保这些事件按顺序被处理。其名称包含两个重要概念:

  1. Delta:表示资源对象的变化记录(增加、更新、删除等)
  2. FIFO:先进先出队列,保证事件按顺序处理

二. DeltaFIFO 的主要功能

DeltaFIFO 解决以下用例:

  1. 确保每个对象变更最多处理一次
  2. 处理对象时,能够看到自上次处理以来发生的所有变化
  3. 处理对象的删除事件
  4. 支持定期重新处理对象

三. 结构体设计

核心结构体

代码语言:go
复制
type DeltaFIFO struct {
    // 锁和条件变量
    lock sync.RWMutex
    cond sync.Cond
    
    // 存储数据的主要字段
    items map[string]Deltas     // 键到 Deltas 列表的映射
    queue []string              // 保持 FIFO 顺序的键列表
    
    // 同步状态追踪
    populated bool
    initialPopulationCount int
    
    // 其他状态字段
    keyFunc KeyFunc              // 从对象获取键的函数 
    knownObjects KeyListerGetter // 已知对象存储,用于对比
    closed bool
    emitDeltaTypeReplaced bool
    transformer TransformFunc
}

Delta 和 Deltas

代码语言:go
复制
// Delta 表示一个变化事件,包含变化类型和对象本身
type Delta struct {
    Type   DeltaType       // 变化类型
    Object interface{}     // 变化后的对象(删除事件则是删除前的最终状态)
}

// Deltas 是一个对象的多个 Delta 的列表,按时间顺序排列
type Deltas []Delta

变化类型

代码语言:go
复制
type DeltaType string

const (
    Added    DeltaType = "Added"    // 新增
    Updated  DeltaType = "Updated"  // 更新
    Deleted  DeltaType = "Deleted"  // 删除
    Replaced DeltaType = "Replaced" // 替换(由于 watch 错误导致重新列举时)
    Sync     DeltaType = "Sync"     // 同步(定期重新同步时)
)

四. 事件排队机制

1.核心入队方法-queueActionLocked

所有事件入队操作最终都调用 queueActionLocked 方法:

  • 增量事件跟踪:每个对象保留完整的变更历史
  • 高效队列管理:通过键去重和有序队列确保处理效率
  • 生产者-消费者模型:入队操作触发条件变量,唤醒消费者
  • 一致性保证:锁保护、队列不变量和事件顺序保证
  • 灵活的转换机制:支持对象在入队前定制化处理
代码语言:go
复制
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    // 获取对象键
    // 重要性:整个队列围绕对象键构建,而非对象本身
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }

    // 转换对象(如果设置了transformer)
    // 灵活性:允许在入队前修改或过滤对象
    if f.transformer != nil {
        obj, err = f.transformer(obj)
        if err != nil {
            return err
        }
    }
    // 核心设计:每个键对应一个 Deltas 列表,记录所有历史变更
    // 获取已有的事件列表
    oldDeltas := f.items[id]
    // 添加新事件
    newDeltas := append(oldDeltas, Delta{actionType, obj})
    // 事件去重
    newDeltas = dedupDeltas(newDeltas)

    // 将新的事件列表存储并通知消费者
    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            // 如果是新对象,将其ID加入队列
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()  // 通知等待的消费者
    } else {
        // 异常状态处理,理论上不可能的路径
		if oldDeltas == nil {
			klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
			return nil
		}
		klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
		f.items[id] = newDeltas
		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
	}
    
    return nil
}

队列维护与通知:

  • 双重数据结构:

items:键到 Deltas 列表的映射(内容存储)

queue:保持 FIFO 顺序的键列表(顺序保证)

  • 并发控制:使用条件变量通知等待的消费者
  • 一致性保证:键只在首次添加时入队,确保每个对象最多出现一次
代码语言:go
复制
if _, exists := f.items[id]; !exists {
    f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()

2.事件入队的几种途径

DeltaFIFO 提供了多种方式将事件入队:

(1) Add - 添加事件
代码语言:go
复制
func (f *DeltaFIFO) Add(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Added, obj)
}
(2) Update - 更新事件
代码语言:go
复制
func (f *DeltaFIFO) Update(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Updated, obj)
}
(3) Delete - 删除事件
代码语言:go
复制
func (f *DeltaFIFO) Delete(obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    
    // 特殊逻辑:如果对象已不存在,则跳过删除
    if f.knownObjects == nil {
        if _, exists := f.items[id]; !exists {
            return nil  // 对象已不存在,无需再次报告删除
        }
    } else {
        // 更复杂的检查逻辑
        _, exists, err := f.knownObjects.GetByKey(id)
        _, itemsExist := f.items[id]
        if err == nil && !exists && !itemsExist {
            return nil  // 对象在索引和队列中都不存在,跳过
        }
    }

    return f.queueActionLocked(Deleted, obj)
}
(4) Replace - 批量替换

使用场景:

  1. 最主要的使用场景是在 Reflector 首次启动时:
代码语言:go
复制
// ListAndWatch 内部调用
err = r.list(stopCh)  // List 操作获取全量数据
// ...在 list 方法中会调用
return r.store.Replace(found, resourceVersion)  // 其中 r.store 就是 DeltaFIFO
// ...在 watchList 方法中也会调用
err = r.store.Replace(temporaryStore.List(), resourceVersion)
  1. 当与 API Server 的 Watch 连接中断时,需要重新建立连接。也会调用Replace
  2. 强制重新同步场景
代码语言:go
复制
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))

    // 决定使用哪种事件类型
    action := Sync
    if f.emitDeltaTypeReplaced {
        action = Replaced
    }

    // 将新列表中的所有对象加入队列
    for _, item := range list {
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        keys.Insert(key)
        if err := f.queueActionLocked(action, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }

    // 标记未在新列表中的对象为删除
    queuedDeletions := 0
    // 1. 检查队列中的对象
    for k, oldItem := range f.items {
        if keys.Has(k) {
            continue
        }
        // 找出最新状态用于生成删除事件
        var deletedObj interface{}
        if n := oldItem.Newest(); n != nil {
            deletedObj = n.Object
            // 处理特殊的删除状态对象
            if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
                deletedObj = d.Obj
            }
        }
        queuedDeletions++
        if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
        }
    }

    // 2. 检查已知对象存储中的对象
    if f.knownObjects != nil {
        // ... 略
    }

    // 初始化填充计数,用于判断同步状态
    if !f.populated {
        f.populated = true
        f.initialPopulationCount = keys.Len() + queuedDeletions
    }

    return nil
}

Replace 方法做三件重要的事:

  1. 添加新对象:将提供的对象列表中的每个对象以 Sync 或 Replaced 类型添加到队列
  2. 标记删除的对象:对于已经在队列或已知对象中但不在新列表中的对象,标记为删除
  3. 设置同步状态:初始化 populated 和 initialPopulationCount,用于追踪同步进度
(5) Resync - 定期重新同步

最常见的使用场景是 Informer 中的定期重新同步机制

Resync 用于解决以下情况:

  • 控制器的本地状态可能与期望状态不同步
  • 某些更新事件可能被错过(网络问题、缓存bug等)
  • 控制器的逻辑依赖于周期性处理所有对象
代码语言:go
复制
// Resync 将 f.knownObjects 中列出的每个对象都添加到队列中,添加时使用 Sync 类型的 Delta
// 但只处理那些尚未在处理队列中的对象键
// 如果 f.knownObjects 为 nil,则 Resync 不做任何操作
func (f *DeltaFIFO) Resync() error {
    // 获取锁以保护对 DeltaFIFO 内部结构的访问
	f.lock.Lock()
	defer f.lock.Unlock()
	// 如果没有配置 knownObjects 存储,则无需执行同步
	if f.knownObjects == nil {
		return nil
	}
	// 获取已知对象存储中的所有键
	keys := f.knownObjects.ListKeys()
    // 遍历每个键,将其加入处理队列
	for _, k := range keys {
		if err := f.syncKeyLocked(k); err != nil {
			return err
		}
	}
	return nil
}

Resync 的具体工作流程:

  1. 获取所有已知对象的键
  2. 对每个键调用 syncKeyLocked
  3. syncKeyLocked 会检查对象是否已在队列中
  4. 如果不在队列中,则以 Sync 类型的 Delta 将对象添加到队列
代码语言:go
复制
// syncKeyLocked 尝试将指定键对应的对象添加到处理队列
// 该方法假设调用者已经持有锁
func (f *DeltaFIFO) syncKeyLocked(key string) error {
    // 从已知对象存储中获取对象
	obj, exists, err := f.knownObjects.GetByKey(key)
	if err != nil {
        // 如果获取对象时发生错误,记录日志但不中断整个同步过程
		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
		return nil
	} else if !exists {
        // 如果对象不存在于已知对象存储中,记录信息并跳过
		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
		return nil
	}

    // 重要优化:如果我们正在执行 Resync(),且该对象已经有事件在队列中等待处理,
    // 则忽略该对象的 Resync。这是为了避免竞争条件:重新同步可能使用对象的旧值,
    // 因为将事件加入队列不会触发底层存储 <knownObjects> 的更新。
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
    // 检查对象是否已在队列中
	if len(f.items[id]) > 0 {
		return nil
	}
	// 将对象以 Sync 类型添加到队列
	if err := f.queueActionLocked(Sync, obj); err != nil {
		return fmt.Errorf("couldn't queue object: %v", err)
	}
	return nil
}
操作的区别
  • Add/Update/Delete: 处理增量变更,通常由 Watch 操作触发
  • Replace: 用于处理整个列表替换,通常在 List 操作后调用
  • Resync: 周期性重新处理已知对象,即使对象没有变化

3.事件去重机制

每当新的事件被添加时,DeltaFIFO 会尝试去重:

代码语言:go
复制
func dedupDeltas(deltas Deltas) Deltas {
    n := len(deltas)
    if n < 2 {
        return deltas
    }
    a := &deltas[n-1]  // 最新事件
    b := &deltas[n-2]  // 次新事件
    if out := isDup(a, b); out != nil {
        deltas[n-2] = *out  // 合并重复事件
        return deltas[:n-1]  // 返回缩减后的列表
    }
    return deltas
}

func isDup(a, b *Delta) *Delta {
    // 目前只处理删除事件的去重
    if out := isDeletionDup(a, b); out != nil {
        return out
    }
    return nil
}

func isDeletionDup(a, b *Delta) *Delta {
    if b.Type != Deleted || a.Type != Deleted {
        return nil  // 只有两个都是删除事件才考虑去重
    }
    // 保留包含更多信息的删除事件
    if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
        return a  // 保留普通删除事件
    }
    return b  // 保留 DeletedFinalStateUnknown 事件
}

五. 事件处理机制

1.消费事件的核心方法-Pop

关键设计思想

  1. 消费者负责失败处理:从队列中移除后,处理失败需显式重新入队
  2. 锁区域最小化:虽然持有锁调用处理函数,但文档明确建议避免耗时操作
  3. 全有或全无语义:要么完全处理项目,要么完全不处理(通过重新入队)
  4. 关注点分离:队列管理与业务逻辑处理分离
代码语言:go
复制
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        // 阻塞等待机制
        // 等待队列有数据
        for len(f.queue) == 0 {
            if f.closed {
                return nil, ErrFIFOClosed
            }
			if f.closed {
				return nil, ErrFIFOClosed
			}

            f.cond.Wait()  // 阻塞等待生产者唤醒
        }
        
        // 判断是否在初始同步列表中
        isInInitialList := !f.hasSynced_locked()
        
        // 从队列头取出一个ID
        // 原子操作: 获取项目并立即从队列和存储中删除
        id := f.queue[0]
        f.queue = f.queue[1:]
        depth := len(f.queue)
        
        // 更新初始填充计数
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        
        // 获取对应的事件列表
        item, ok := f.items[id]
        if !ok {
            // 异常情况
            klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
            continue
        }
        
        // 从存储中删除该项
        delete(f.items, id)
        
        // 性能追踪
        if depth > 10 {
            trace := utiltrace.New("DeltaFIFO Pop Process",
                utiltrace.Field{Key: "ID", Value: id},
                utiltrace.Field{Key: "Depth", Value: depth},
                utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
            defer trace.LogIfLong(100 * time.Millisecond)
        }
        
        // 处理事件
        err := process(item, isInInitialList) // 回调设计: 通过函数参数注入处理逻辑
        if e, ok := err.(ErrRequeue); ok {
            // 处理器请求重新入队
            f.addIfNotPresent(id, item) // 重新入队机制: 支持处理失败时重新入队
            err = e.Err
        }
        
        return item, err
    }
}

Pop实际应用场景:

在 Controller 模式中:

代码语言:go
复制
for {
    // 从 DeltaFIFO 获取下一个要处理的项目
    obj, err := queue.Pop(func(obj interface{}, isInInitialList bool) error {
        // 处理从队列中弹出的项目
        deltas := obj.(cache.Deltas)
        // ... 处理逻辑 ...
        return nil
    })
    // ... 错误处理 ...
}

Pop 方法是 Kubernetes 控制器模式中的核心环节,它确保了资源变更能够可靠、有序地被消费和处理,是整个事件驱动系统的关键组件。

2.判断同步状态-hasSynced_locked

代码语言:go
复制
func (f *DeltaFIFO) hasSynced_locked() bool {
	return f.populated && f.initialPopulationCount == 0
}

hasSynced_locked() 用于判断 DeltaFIFO 队列是否已经完成初始同步,返回 true 表示"已完成初始同步"。要满足"已同步"需要同时满足两个条件:

  1. f.populated == true:队列已被填充数据
  2. f.initialPopulationCount == 0:初始数据已全部处理完毕
这两个变量是如何设置的

1.populated:

  • 在首次调用 Add/Update/Delete 时设置为 true
  • 或者在 Replace 方法执行时设置为 true

2.initialPopulationCount:

  • 在首次调用 Replace 时,设置为"新列表中对象数量 + 被标记为删除的对象数量"
  • 每当 Pop 方法取出一个元素时,如果 initialPopulationCount > 0,则减少 1
hasSynced_locked作用

这个方法的主要作用是告诉消费者(通常是 Informer 或控制器):

  • 是否已经处理完 List 操作返回的所有初始对象
  • 是否可以开始依赖这个缓存来做决策

当 Kubernetes 控制器启动时:

  1. 首先执行 List 操作获取所有对象(对应 Replace 调用)
  2. 同时启动 Watch 操作监听后续变化(对应 Add/Update/Delete 调用)
  3. 控制器需要等待初始数据全部处理完毕,才能确保后续的增量更新是基于完整状态的
hasSynced_locked应用示例

比如在 SharedInformer 中:

代码语言:go
复制
func (s *sharedIndexInformer) HasSynced() bool {
    s.controller.Lock()
    defer s.controller.Unlock()
    return s.controller.hasSynced()
}

func (c *controller) hasSynced() bool {
    return c.config.Queue.HasSynced()  // 这里会调用 DeltaFIFO 的 HasSynced 方法 
}

func (f *DeltaFIFO) HasSynced() bool {
	f.lock.Lock()
	defer f.lock.Unlock()
	return f.hasSynced_locked() // 调用了 hasSynced_locked 方法
}

控制器会等待 HasSynced() 返回 true 后才开始正常工作循环,这确保了控制器决策基于完整的数据。

简单来说,这个方法就是一个"初始化完成"的标志,对于保证 Kubernetes 控制器的正确性至关重要。

六. 生成者-消费者同步机制

DeltaFIFO 使用 sync.Cond 实现生产者-消费者模式:

  1. 生产者:调用 Add/Update/Delete/Replace 等方法向队列添加事件,然后调用 f.cond.Broadcast() 通知消费者
  2. 消费者:调用 Pop 方法,如果队列为空则 f.cond.Wait() 阻塞等待,直到有新事件到来

七. DeltaFIFO应用流程

1.Reflector 作为生产者,将 API Server 的变更事件添加到 DeltaFIFO

  • List 操作调用 Replace 方法
  • Watch 操作调用 Add/Update/Delete 方法

2.Informer 内部的处理器作为消费者,调用 Pop 方法处理事件

  • 将事件分发给注册的事件处理器
  • 更新本地缓存(Indexer)

3.完整链路

代码语言:txt
复制
   API Server --(List/Watch)--> Reflector --(Add/Update/Delete)--> DeltaFIFO --(Pop)--> Controller --(Process)--> 业务逻辑

八. 总结

DeltaFIFO 通过 FIFO 队列和事件变更记录(Delta)机制,实现了对 Kubernetes 资源对象变更的有序处理。关键特点包括:

  1. 完整的变更记录:每个资源对象的所有变更都按顺序保存
  2. 高效的事件去重:特别是对于删除事件的去重
  3. 生产者-消费者模式:通过条件变量实现生产者与消费者之间的同步
  4. 处理重新同步:通过 Replace 和 Resync 支持全量更新和定期重同步
  5. 跟踪同步状态:通过 populated 和 initialPopulationCount 追踪同步进度

DeltaFIFO 的设计体现了 Kubernetes 对事件处理的严谨性,确保了集群状态变更能够可靠地传递给各个控制器。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. DeltaFIFO 的基本概念
  • 二. DeltaFIFO 的主要功能
  • 三. 结构体设计
    • 核心结构体
    • Delta 和 Deltas
    • 变化类型
  • 四. 事件排队机制
    • 1.核心入队方法-queueActionLocked
    • 2.事件入队的几种途径
      • (1) Add - 添加事件
      • (2) Update - 更新事件
      • (3) Delete - 删除事件
      • (4) Replace - 批量替换
      • (5) Resync - 定期重新同步
      • 操作的区别
    • 3.事件去重机制
  • 五. 事件处理机制
    • 1.消费事件的核心方法-Pop
    • 2.判断同步状态-hasSynced_locked
  • 六. 生成者-消费者同步机制
  • 七. DeltaFIFO应用流程
  • 八. 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档