上一篇文章里,我们主要介绍了和对象存储相关的组件 Store 接口以及它的实现结构体 cache,本质上说该接口和它的实现是对以前文章中介绍的 ThreadSafeStore 接口和它具体实现的更高级抽象,即 ThreadSafeStore 接口的操作需要针对资源对象以及对象的 key, 而 Store 接口有能力获取资源对象的 key, 所以该接口只针对资源对象操作。当然,两种组件针对资源对象的操作在底层上都是并发安全的。本篇文章中我们主要来介绍 Queue 和 DeltaFIFO 组件 ,也是资源对象存储组件。
Queue 接口
Queue 是接口,图解和源码如下:
//staging/src/k8s.io/client-go/tools/cache/fifo.go
type Queue interface {
Store
Pop(PopProcessFunc) (interface{}, error)
AddIfNotPresent(interface{}) error
HasSynced() bool
Close()
}
Delta 结构体
Delta 结构体定义资源对象的创建,更新,删除等操作的元数据信息,图解和源码如下:
//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string
type Delta struct {
Type DeltaType
Object interface{}
}
type Deltas []Delta
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced"
Sync DeltaType = "Sync"
)
DeltaFIFO 结构体
DeltaFIFI 结构体实现了上面介绍的 Queue 接口,针对的元素都是 Delta 类型的对象,其图解和源码如下:
//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
items map[string]Deltas
queue []string
populated bool
initialPopulationCount int
keyFunc KeyFunc
knownObjects KeyListerGetter
closed bool
emitDeltaTypeReplaced bool
}
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error){...}
func (f *DeltaFIFO) Add(obj interface{}) error{...}
func (f *DeltaFIFO) Update(obj interface{}) error{...}
func (f *DeltaFIFO) Delete(obj interface{}){...}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error{...}
......//other methods impl defined in Store interface
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
DeltaFIFO 结构体的创建
DeltaFIFI 结构体创建的源码如下:
//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFOOptions struct {
KeyFunction KeyFunc
KnownObjects KeyListerGetter
EmitDeltaTypeReplaced bool
}
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
}
f.cond.L = &f.lock
return f
}
// k8s.io/client-go/tools/cache/store.go
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
目前我们先写到这里,在下一篇文章中我们继续来介绍 kubernetes 资源对象的 list and watch 机制。