前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >分析kubernetes中的事件机制

分析kubernetes中的事件机制

原创
作者头像
silenceper
修改于 2020-03-09 03:08:14
修改于 2020-03-09 03:08:14
1.7K0
举报
文章被收录于专栏:技术成长之路技术成长之路

我们通过 kubectl describe [资源] 命令,可以在看到Event输出,并且经常依赖event进行问题定位,从event中可以分析整个POD的运行轨迹,为服务的客观测性提供数据来源,由此可见,event在Kubernetes中起着举足轻重的作用。

event展示
event展示

event并不只是kubelet中都有的,关于event的操作被封装在client-go/tools/record包,我们完全可以在写入自定义的event。

现在让我们来一步步揭开event的面纱。

<a name="EBJdC"></a>

Event定义

其实event也是一个资源对象,并且通过apiserver将event存储在etcd中,所以我们也可以通过 kubectl get event 命令查看对应的event对象。

以下是一个event的yaml文件:

代码语言:txt
AI代码解释
复制
apiVersion: v1
count: 1
eventTime: null
firstTimestamp: "2020-03-02T13:08:22Z"
involvedObject:
  apiVersion: v1
  kind: Pod
  name: example-foo-d75d8587c-xsf64
  namespace: default
  resourceVersion: "429837"
  uid: ce611c62-6c1a-4bd8-9029-136a1adf7de4
kind: Event
lastTimestamp: "2020-03-02T13:08:22Z"
message: Pod sandbox changed, it will be killed and re-created.
metadata:
  creationTimestamp: "2020-03-02T13:08:30Z"
  name: example-foo-d75d8587c-xsf64.15f87ea1df862b64
  namespace: default
  resourceVersion: "479466"
  selfLink: /api/v1/namespaces/default/events/example-foo-d75d8587c-xsf64.15f87ea1df862b64
  uid: 9fe6f72a-341d-4c49-960b-e185982d331a
reason: SandboxChanged
reportingComponent: ""
reportingInstance: ""
source:
  component: kubelet
  host: minikube
type: Normal

主要字段说明:**

  • involvedObject: 触发event的资源类型
  • lastTimestamp:最后一次触发的时间
  • message:事件说明
  • metadata :event的元信息,name,namespace等
  • reason:event的原因
  • source:上报事件的来源,比如kubelet中的某个节点
  • type:事件类型,Normal或Warning

event字段定义可以看这里:types.go#L5078

接下来我们来看看,整个event是如何下入的。

<a name="MUouw"></a>

写入事件

1、这里以kubelet为例,看看是如何进行事件写入的

2、文中代码以Kubernetes 1.17.3为例进行分析

先以一幅图来看下整个的处理流程

event处理过程
event处理过程

创建操作事件的客户端: kubelet/app/server.go#L461

代码语言:txt
AI代码解释
复制
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
	if kubeDeps.Recorder != nil {
		return
	}
    //事件广播
	eventBroadcaster := record.NewBroadcaster()
    //创建EventRecorder
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
	//发送event至log输出
    eventBroadcaster.StartLogging(klog.V(3).Infof)
	if kubeDeps.EventClient != nil {
		klog.V(4).Infof("Sending events to api server.")
        //发送event至apiserver
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		klog.Warning("No api server defined - no events will be sent to API server.")
	}
}

通过 makeEventRecorder 创建了 EventRecorder 实例,这是一个事件广播器,通过它提供了StartLogging和StartRecordingToSink两个事件处理函数,分别将event发送给log和apiserver。 NewRecorder创建了 EventRecorder 的实例,它提供了 EventEventf 等方法供事件记录。

<a name="3klAc"></a>

EventBroadcaster

我们来看下EventBroadcaster接口定义:event.go#L113

代码语言:txt
AI代码解释
复制
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
    //
	StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
	StartRecordingToSink(sink EventSink) watch.Interface
	StartLogging(logf func(format string, args ...interface{})) watch.Interface
	NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder

	Shutdown()
}

具体实现是通过 eventBroadcasterImpl struct来实现了各个方法。

其中StartLogging 和 StartRecordingToSink 其实就是完成了对事件的消费,EventRecorder实现对事件的写入,中间通过channel实现了生产者消费者模型。

<a name="uX4b1"></a>

EventRecorder

我们先来看下EventRecorder 接口定义:event.go#L88,提供了一下4个方法

代码语言:txt
AI代码解释
复制
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
	// Event constructs an event from the given information and puts it in the queue for sending.
	// 'object' is the object this event is about. Event will make a reference-- or you may also
	// pass a reference to the object directly.
	// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
	// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
	// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
	// to automate handling of events, so imagine people writing switch statements to handle them.
	// You want to make that easy.
	// 'message' is intended to be human readable.
	//
	// The resulting event will be created in the same namespace as the reference object.
	Event(object runtime.Object, eventtype, reason, message string)

	// Eventf is just like Event, but with Sprintf for the message field.
	Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

	// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
	PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

	// AnnotatedEventf is just like eventf, but with annotations attached
	AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

主要参数说明:

  • object 对应event资源定义中的 involvedObject
  • eventtype 对应event资源定义中的type,可选Normal,Warning.
  • reason :事件原因
  • message :事件消息

我们来看下当我们调用 Event(object runtime.Object, eventtype, reason, message string) 的整个过程。 发现最终都调用到了 generateEvent 方法:event.go#L316

代码语言:txt
AI代码解释
复制
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
    .....
	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
	event.Source = recorder.source
	go func() {
		// NOTE: events should be a non-blocking operation
		defer utilruntime.HandleCrash()
		recorder.Action(watch.Added, event)
	}()
}

最终事件在一个 goroutine 中通过调用 recorder.Action 进入处理,这里保证了每次调用event方法都是非阻塞的。 其中 makeEvent 的作用主要是构造了一个event对象,事件name根据InvolvedObject中的name加上时间戳生成:

注意看:对于一些非namespace资源产生的event,event的namespace是default

代码语言:txt
AI代码解释
复制
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
	t := metav1.Time{Time: recorder.clock.Now()}
	namespace := ref.Namespace
	if namespace == "" {
		namespace = metav1.NamespaceDefault
	}
	return &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
			Namespace:   namespace,
			Annotations: annotations,
		},
		InvolvedObject: *ref,
		Reason:         reason,
		Message:        message,
		FirstTimestamp: t,
		LastTimestamp:  t,
		Count:          1,
		Type:           eventtype,
	}
}

进一步跟踪Action方法,apimachinery/blob/master/pkg/watch/mux.go#L188:23

代码语言:txt
AI代码解释
复制
// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
	m.incoming <- Event{action, obj}
}

将event写入到了一个channel里面。 注意: 这个Action方式是apimachinery包中的方法,因为实现的sturt recorderImpl*watch.Broadcaster 作为一个匿名struct,并且在 NewRecorder 进行 Broadcaster 赋值,这个Broadcaster其实就是 eventBroadcasterImpl 中的Broadcaster

到此,基本清楚了event最终被写入到了 Broadcaster 中的 incoming channel中,下面看下是怎么进行消费的。

<a name="Mwa4D"></a>

消费事件

makeEventRecorder 调用的 StartLoggingStartRecordingToSink 其实就是完成了对事件的消费。

  • StartLogging直接将event输出到日志
  • StartRecordingToSink将事件写入到apiserver

两个方法内部都调用了 StartEventWatcher 方法,并且传入一个 eventHandler 方法对event进行处理

代码语言:txt
AI代码解释
复制
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
	watcher := e.Watch()
	go func() {
		defer utilruntime.HandleCrash()
		for watchEvent := range watcher.ResultChan() {
			event, ok := watchEvent.Object.(*v1.Event)
			if !ok {
				// This is all local, so there's no reason this should
				// ever happen.
				continue
			}
			eventHandler(event)
		}
	}()
	return watcher
}

其中 watcher.ResultChan 方法就拿到了事件,这里是在一个goroutine中通过func (m *Broadcaster) loop() ==>func (m *Broadcaster) distribute(event Event) 方法调用将event又写入了broadcasterWatcher.result

主要看下 StartRecordingToSink 提供的的eventHandlerrecordToSink 方法:

代码语言:txt
AI代码解释
复制
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
	// Make a copy before modification, because there could be multiple listeners.
	// Events are safe to copy like this.
	eventCopy := *event
	event = &eventCopy
	result, err := eventCorrelator.EventCorrelate(event)
	if err != nil {
		utilruntime.HandleError(err)
	}
	if result.Skip {
		return
	}
	tries := 0
	for {
		if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
			break
		}
		tries++
		if tries >= maxTriesPerEvent {
			klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
			break
		}
		// Randomize the first sleep so that various clients won't all be
		// synced up if the master goes down.
        // 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件
		if tries == 1 {
			time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
		} else {
			time.Sleep(sleepDuration)
		}
	}
}

其中event被经过了一个 eventCorrelator.EventCorrelate(event) 方法做预处理,主要是聚合相同的事件(避免产生的事件过多,增加 etcd 和 apiserver 的压力,也会导致查看 pod 事件很不清晰)

下面一个for循环就是在进行重试,最大重试次数是12次,调用 recordEvent 方法才真正将event写入到了apiserver。

<a name="QwCYz"></a>

事件处理

我们来看下EventCorrelate方法:

代码语言:txt
AI代码解释
复制
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

分别调用了 aggregator.EventAggregatelogger.eventObservefilterFunc 三个方法,分别作用是:

  1. aggregator.EventAggregate:聚合event,如果在最近 10 分钟出现过 10 个相似的事件(除了 message 和时间戳之外其他关键字段都相同的事件),aggregator 会把它们的 message 设置为 (combined from similar events)+event.Message
  2. logger.eventObserve:它会把相同的事件以及包含 aggregator 被聚合了的相似的事件,通过增加 Count 字段来记录事件发生了多少次。
  3. filterFunc: 这里实现了一个基于令牌桶的限流算法,如果超过设定的速率则丢弃,保证了apiserver的安全。

我们主要来看下aggregator.EventAggregate方法:

代码语言:txt
AI代码解释
复制
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
	now := metav1.NewTime(e.clock.Now())
	var record aggregateRecord
	// eventKey is the full cache key for this event
    //eventKey 是将除了时间戳外所有字段结合在一起
	eventKey := getEventKey(newEvent)
	// aggregateKey is for the aggregate event, if one is needed.
    //aggregateKey 是除了message和时间戳外的字段结合在一起,localKey 是message
	aggregateKey, localKey := e.keyFunc(newEvent)

	// Do we have a record of similar events in our cache?
	e.Lock()
	defer e.Unlock()
    //从cache中根据aggregateKey查询是否存在,如果是相同或者相类似的事件会被放入cache中
	value, found := e.cache.Get(aggregateKey)
	if found {
		record = value.(aggregateRecord)
	}

    //判断上次事件产生的时间是否超过10分钟,如何操作则重新生成一个localKeys集合(集合中存放message)
	maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
	interval := now.Time.Sub(record.lastTimestamp.Time)
	if interval > maxInterval {
		record = aggregateRecord{localKeys: sets.NewString()}
	}

	// Write the new event into the aggregation record and put it on the cache
    //将locakKey也就是message放入集合中,如果message相同就是覆盖了
	record.localKeys.Insert(localKey)
	record.lastTimestamp = now
	e.cache.Add(aggregateKey, record)

	// If we are not yet over the threshold for unique events, don't correlate them
	//判断localKeys集合中存放的类似事件是否超过10个,
    if uint(record.localKeys.Len()) < e.maxEvents {
		return newEvent, eventKey
	}

	// do not grow our local key set any larger than max
	record.localKeys.PopAny()

	// create a new aggregate event, and return the aggregateKey as the cache key
	// (so that it can be overwritten.)
	eventCopy := &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
			Namespace: newEvent.Namespace,
		},
		Count:          1,
		FirstTimestamp: now,
		InvolvedObject: newEvent.InvolvedObject,
		LastTimestamp:  now,
        //这里会对message加个前缀:(combined from similar events):
		Message:        e.messageFunc(newEvent),
		Type:           newEvent.Type,
		Reason:         newEvent.Reason,
		Source:         newEvent.Source,
	}
	return eventCopy, aggregateKey
}

aggregator.EventAggregate方法中其实就是判断了通过cache和localKeys判断事件是否相似,如果最近 10 分钟出现过 10 个相似的事件就合并并加上前缀,后续通过logger.eventObserve方法进行count累加,如果message也相同,肯定就是直接count++。

<a name="yEk1J"></a>

总结

好了,event处理的整个流程基本就是这样,我们可以概括一下,可以结合文中的图对比一起看下:

  1. 创建 EventRecorder 对象,通过其提供的 Event 等方法,创建好event对象
  2. 将创建出来的对象发送给 EventBroadcaster 中的channel中
  3. EventBroadcaster 通过后台运行的goroutine,从管道中取出事件,并广播给提前注册好的handler处理
  4. 当输出log的handler收到事件就直接打印事件
  5. EventSink handler收到处理事件就通过预处理之后将事件发送给apiserver
  6. 其中预处理包含三个动作,1、限流 2、聚合 3、计数
  7. apiserver收到事件处理之后就存储在etcd中

回顾event的整个流程,可以看到event并不是保证100%事件写入(从预处理的过程来看),这样做是为了后端服务etcd的可用性,因为event事件在整个集群中产生是非常频繁的,尤其在服务不稳定的时候,而相比Deployment,Pod等其他资源,又没那么的重要。所以这里做了个取舍。

参考文档:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
Python 列表、字典、元组的一些小技巧
我们知道 Python 的内置 dictionary 数据类型是无序的,通过 key 来获取对应的 value。可是有时我们需要对 dictionary 中的 item 进行排序输出,可能根据 key,也可能根据 value 来排。到底有多少种方法可以实现对 dictionary 的内容进行排序输出呢?下面摘取了使用 sorted 函数实现对 dictionary 的内容进行排序输出一些精彩的解决办法。
章鱼猫先生
2021/10/15
1.2K0
Python 列表、字典、元组的一些小技巧
python3字典的排序
通一表述:字典有两个参数,key, value,下面所描述,键:key,值:value
全栈程序员站长
2022/09/13
4300
python 对字典"排序"总结
对字典进行排序?这其实是一个伪命题,搞清楚python字典的定义---字典本身默认以key的字符顺序输出显示---就像我们用的真实的字典一样,按照abcd字母的顺序排列,并且本质上各自没有先后关系,是一个哈希表的结构:
用户5745385
2019/07/04
5.6K0
python 对字典"排序"总结
python字典排序
利用Python 内置函数 sorted 对字典的键或者值进行排序,首先来了解下 sorted 函数
雷子
2024/03/25
1300
python字典排序
Python字典按键/值排序的几种方法
同理,如果我们只需要对sort_by_value稍微修改一下,就可以得到按值排序的结果:
宇宙之一粟
2020/10/26
5.8K0
Python 列表排序
本文最先发布在: https://www.itcoder.tech/posts/python-list-sort/
雪梦科技
2020/06/05
1.8K0
python中列表排序,字典排序,列表中的字典排序
key= lambda dict1:dict1[0] #dict1[0]表示按键,dict1[1]表示按值。
用户8346838
2021/03/10
9.7K0
python字典的合并排序添加查询
python 中映射类型里key和value是一种一对多的关系,通常被认为是一种可变的哈希表。字典对象是可变的,它是一个容器类型,能存储任意个数的Python对象,也可存储其他容器类型。
Tim在路上
2020/08/04
1K0
python dict的list排序
对于简单的list排序,直接调用内建函数就可以了,但是对于dict的list排序就没有那么直接了,不过,还是有很简洁的办法的,如:
py3study
2020/01/14
1.9K0
分享几个简单易懂的Python技巧,能够极大的提高工作效率哦!
大家好,我是俊欣,今天和大家来分享几个关于Python的小技巧,都是非常简单易懂的内容,希望大家看了之后能够有所收获。
用户6888863
2021/07/19
4180
Python 学习入门(11)—— 排序
Python的内置dictionary数据类型是无序的,通过key来获取对应的value。可是有时我们需要对dictionary中 的item进行排序输出,可能根据key,也可能根据value来排。
阳光岛主
2019/02/19
4120
Python 进阶编程之字典的高级用法
一个字典就是一个键对应一个单值的映射,而上面的列表中有相同键。如果你想要一个键映射多个值,那么就需要将这多个值放到另外的序列中,比如 list 或者 set 里面,像下面这样:
Crossin先生
2019/11/10
1.3K0
Python这10个字典操作你必须知道
不要使用 key in d.keys(), 这是一种画蛇添足的操作,因为d.keys()会返回一个新的列表对象,导致内存最大。
IT阅读排行榜
2019/05/14
4860
python list排序
python 列表list中内置了一个十分有用的排序函数sort,sorted,它可以用于列表的排序,以下是例子。
py3study
2020/01/07
1.2K0
Python字典不是不可以排序,是你方法没用对!
摘要:排序是个古老的话题,不过对于字典的排序,常常会让 小白手足无措。好像没有找到可以排序字典的函数呢!到底是按key排序,还是按value排序呢?字典到底可不可以按value排序呢?排完序后,还可以通过key检索吗?当然,还会抛出很多问题,而本文将完美地给出了这些问题的答案!
蒙娜丽宁
2021/02/19
1.1K0
Python-排序-01-字典排序
系统:Windows 7 语言版本:Anaconda3-4.3.0.1-Windows-x86_64 编辑器:pycharm-community-2016.3.2
zishendianxia
2019/10/23
8960
Python-排序-01-字典排序
python3的sorted和sort
python内置排序函数sorted,可以适用于所有可迭代的对象。而类型自有的sort函数只适用于类型本身。例如list.sort(),只适用于列表类型。另外,sort函数是在原来列表上直接进行排序,而sorted函数则是返回一个排序之后的列表。
zy010101
2022/05/05
3830
python中列表的排序方法操作总结分享
简单记一下python中List的sort方法(或者sorted内建函数)的用法。
我叫什么好
2022/01/08
8420
Python3学习笔记 | 七、Python的类型与运算-字典
字典在Python里是无序集合对象类型,字典的值都有独立的唯一的键(Key),用相应的键来取值。
TeamsSix
2019/09/24
7190
python 字典一些常见的魔法方法以及遇到的面试题
dict 类型不但在各种程序里广泛使用,它也是 Python 语言的基石。模块的命名空间、实例的属性和函数的关键字参数中都可以看到字典的身影。跟它有关的内置函数都在__builtins__.__dict__模块中。正是因为字典至关重要,Python 对它的实现做了高度优化,而散列表则是字典类型性能出众的根本原因。
用户4945346
2020/07/20
7610
相关推荐
Python 列表、字典、元组的一些小技巧
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档