前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《一起读 kubernetes 源码》deployment 滚动更新是如何实现的

《一起读 kubernetes 源码》deployment 滚动更新是如何实现的

作者头像
LinkinStar
发布2024-07-13 15:00:54
1490
发布2024-07-13 15:00:54
举报
文章被收录于专栏:LinkinStar's Blog

前言

这一节终于来到了我们最为熟悉的一个对象 deployment,通常这可能是我们学习 k8s 接触的第一个大对象了,我们一般的应用也是以 deployment 来进行部署的,那么对于熟悉的它来说,我们应该从源码里面去找什么目标来看呢?对于我来说,deployment 的更新是我最好奇的,在我重新修改镜像版本之后,deployment 是如何一步步控制已有的 pod 进行更新的呢?这一节我们就从源码中揭秘这个过程。

前置知识

  • deployment 的基础使用
  • 滚动更新

心路历程

在我看来其他的属性与 pod 类似,而 deployment 作为一个 pod 的集合。那,为什么 deployment 要让 pod 的有多个副本呢?从最初的角度角度来说肯定是高可用了,所以 deployment 中最为关键的就是对 pod 的控制,也就是当 pod 的数量变化的时候,它是如何操作的。

码前提问

  1. deployment 是由哪个对象控制的?
  2. 应用更新的时候 deployment 是如何控制更新过程的?

源码分析

寻码过程

像 deployment 这样的源码比起其他就好找很多了,毕竟命名比较直接。在看来前几节之后,我不知道你是否发现了一个规律。通常看源码的正向思路可以被总结为:

  1. 找到对应实现的数据结构,通常是一个或多个结构体
  2. 看它的初始化,初始化能告诉你其中哪些必要的准备步骤,和具体一些字段的基础能力
  3. 看它的方法,通常就能知道你想要具体实现原理了

Deployment 结构

话不多说,先找到它的数据结构

代码语言:javascript
复制
// vendor/k8s.io/api/apps/v1/types.go:355
type Deployment struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
	// +optional
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

	// Specification of the desired behavior of the Deployment.
	// +optional
	Spec DeploymentSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`

	// Most recently observed status of the Deployment.
	// +optional
	Status DeploymentStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

可以看到,数据结构的定义和我们平常使用的 yaml 文件的定义是一一对应的,非常容易理解,可以简单浏览一下 Spec 的属性。

那么关键的问题来了,是哪个结构在控制 deployment?于是开始寻找 Deployment 的引用,看哪些位置在使用这个数据结构,引用很多,但是你只需要按文件去看就可以了。

阅读其他源码时,如果一个对象的引用,不要去寻找每一个代码引用的位置,而应该先从文件入手,如果引用的文件还是很多,可以从包的角度入手,一个包下通常能力方向也是类似的

于是,我找到了 DeploymentController 这个关键的对象(看命名也应该是它了,控制器嘛),今天我们后面就是围绕着它展开的。注意哦,

DeploymentController

DeploymentController 结构

还是类似的,先来看看结构

代码语言:javascript
复制
// pkg/controller/deployment/deployment_controller.go:66
type DeploymentController struct {
	// rsControl is used for adopting/releasing replica sets.
	rsControl controller.RSControlInterface
	client    clientset.Interface

	eventBroadcaster record.EventBroadcaster
	eventRecorder    record.EventRecorder

	// To allow injection of syncDeployment for testing.
	syncHandler func(ctx context.Context, dKey string) error
	// used for unit testing
	enqueueDeployment func(deployment *apps.Deployment)

	// dLister can list/get deployments from the shared informer's store
	dLister appslisters.DeploymentLister
	// rsLister can list/get replica sets from the shared informer's store
	rsLister appslisters.ReplicaSetLister
	// podLister can list/get pods from the shared informer's store
	podLister corelisters.PodLister

	// dListerSynced returns true if the Deployment store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	dListerSynced cache.InformerSynced
	// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	rsListerSynced cache.InformerSynced
	// podListerSynced returns true if the pod store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	podListerSynced cache.InformerSynced

	// Deployments that need to be synced
	queue workqueue.RateLimitingInterface
}

注意两个点就好,一个是 syncHandler 还有一个是 queue 看到这两个字段我心里其实已经有个大概的思路了。下面就要用到我们在第一节提到的 informer 机制了。

NewDeploymentController

初始化是在 NewDeploymentController 方法中,我省略了其中一些部分,留下了重要的几个例子

代码语言:javascript
复制
// pkg/controller/deployment/deployment_controller.go:101
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
	//....
	dc := &DeploymentController{
		//....
		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
	}
	//....

	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			dc.addDeployment(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			dc.updateDeployment(logger, oldObj, newObj)
		},
		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
		DeleteFunc: func(obj interface{}) {
			dc.deleteDeployment(logger, obj)
		},
	})
	//....

	dc.syncHandler = dc.syncDeployment
	dc.enqueueDeployment = dc.enqueue

	//....
	return dc, nil
}

有了前面的知识,这里的代码我们就很容易理解了,关键是在于注册了有个 ResourceEvent 处理的各种能力,比如当 Add 事件来的时候,调用 addDeployment。先留心注意下面的两个部分 syncHandlerenqueueDeployment 后面会用到。接下来我们肯定会好奇,addDeployment 究竟是如何处理这个事件的,所以我们继续深入看里面的实现。

addDeployment

这里面的调用链路很清晰:addDeployment -> enqueueDeployment -> enqueue -> dc.queue.Add(key)

代码语言:javascript
复制
// pkg/controller/deployment/deployment_controller.go:391
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
	key, err := controller.KeyFunc(deployment)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
		return
	}

	dc.queue.Add(key)
}

其实这些处理的工作,将 deployment 对应的 key 丢到队列里面去,所以下面我们只需要找到哪里在处理队列中的消息就可以了

Run

地方也很好找,是在 Run 里面,运行的时候启动了一定数量的 worker,然后每个 worker 循环去取消息。

代码语言:javascript
复制
// pkg/controller/deployment/deployment_controller.go:157
// Run begins watching and syncing.
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
	//...

	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, dc.worker, time.Second)
	}

	<-ctx.Done()
}
代码语言:javascript
复制
// pkg/controller/deployment/deployment_controller.go:473
func (dc *DeploymentController) worker(ctx context.Context) {
	for dc.processNextWorkItem(ctx) {
	}
}

func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
	key, quit := dc.queue.Get()
	if quit {
		return false
	}
	defer dc.queue.Done(key)

	err := dc.syncHandler(ctx, key.(string))
	dc.handleErr(ctx, err, key)

	return true
}

整个路径就是:Run -> worker -> processNextWorkItem -> syncHandler

可以看到就是一个标准的生产者消费者模型。然后关键就来到了 syncHandler 变量,还记得 dc.syncHandler = dc.syncDeployment 吗?对的,它在初始化时候被赋值为了 syncDeployment 这就到了我们这一节的重点方法了,注意看。

syncDeployment

这里我不想省略太多的代码,因为它本身是一个顺序结构,很容易理解。

代码语言:javascript
复制
// pkg/controller/deployment/deployment_controller.go:581
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
	//...

	deployment, err := dc.dLister.Deployments(namespace).Get(name)
	if errors.IsNotFound(err) {
		logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
		return nil
	}
	if err != nil {
		return err
	}

	// Deep-copy otherwise we are mutating our cache.
	// TODO: Deep-copy only when needed.
	d := deployment.DeepCopy()

	//...

	// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
	// through adoption/orphaning.
	rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
	if err != nil {
		return err
	}
	// List all Pods owned by this Deployment, grouped by their ReplicaSet.
	// Current uses of the podMap are:
	//
	// * check if a Pod is labeled correctly with the pod-template-hash label.
	// * check that no old Pods are running in the middle of Recreate Deployments.
	podMap, err := dc.getPodMapForDeployment(d, rsList)
	if err != nil {
		return err
	}

	//...

	if d.Spec.Paused {
		return dc.sync(ctx, d, rsList)
	}

	// rollback is not re-entrant in case the underlying replica sets are updated with a new
	// revision so we should ensure that we won't proceed to update replica sets until we
	// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
	if getRollbackTo(d) != nil {
		return dc.rollback(ctx, d, rsList)
	}

	scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
	if err != nil {
		return err
	}
	if scalingEvent {
		return dc.sync(ctx, d, rsList)
	}

	switch d.Spec.Strategy.Type {
	case apps.RecreateDeploymentStrategyType:
		return dc.rolloutRecreate(ctx, d, rsList, podMap)
	case apps.RollingUpdateDeploymentStrategyType:
		return dc.rolloutRolling(ctx, d, rsList)
	}
	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
  1. 第一步就是找到 deployment
  2. 第二步是找到 rsList 也是我们说的 RS
  3. 然后找到 podMap

寻找完了之后就开始根据状态进行操作。有哪些操作呢?

  • rollback 回滚
  • scaling 判断现在是不是在调整大小
  • rollout 关键来了,这就是更新,有两种模式
    • Recreate 重建
    • Rolling 滚动更新

这里我们最关心的策略终于暴露出来了,那就是滚动更新了,我们赶快来看看里面是怎么实现的。

rolloutRolling

代码语言:javascript
复制
// pkg/controller/deployment/rolling.go:31
// rolloutRolling implements the logic for rolling a new replica set.
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
	if err != nil {
		return err
	}
	allRSs := append(oldRSs, newRS)

	// Scale up, if we can.
	scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
	if err != nil {
		return err
	}
	if scaledUp {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}

	// Scale down, if we can.
	scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
	if err != nil {
		return err
	}
	if scaledDown {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}

	if deploymentutil.DeploymentComplete(d, &d.Status) {
		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
			return err
		}
	}

	// Sync deployment status
	return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

步骤其实比我想的要简单:

  1. 得到新旧的 RS 进行比较
  2. 先看要不要扩副本数,如果要则直接扩,并且就直接 sync 了不继续了
  3. 然后才轮到缩容,能操作就直接操作了。

然后,我们来回忆一下 pod 数量在实际更新中的变动过程,如果目前的 pod 是 3/3(目标/现有),那么扩容之后就会变成 3/4,此时下一次进来就不能扩了,只能变成缩了变成 3/3 然后不断往复,直到所以 pod 都满足期望要求的版本。想想真的蛮奇妙的,就是利用了简单的状态管理就实现了整个滚动更新过程,慢慢的就靠近了目标。这可能就是状态机的优雅吧,你只管改状态,剩下的协调交给我。

码后解答

  1. deployment 是由哪个对象控制的?
    1. DeploymentController
  2. 应用更新的时候 deployment 是如何控制更新过程的?
    1. 关键其实就在于:rolloutRolling ,将目标态的 pod 添加,打破平衡(状态变化),将不满足的旧状态移除,从而慢慢协调到最终状态。再说的简单一点:先尝试 scaledUp 然后尝试 scaledDown

总结提升

设计上

deployment 这里我们能学到哪些设计上的提升点呢?我个人有下面几个

  1. 首先就是 NewDeploymentController 里面对于 Informer 机制的运用
  2. 利用策略模式 RollingUpdateRecreate 两种不同实现很清晰
  3. 利用状态的管理来构建当前 rolloutRolling 的操作,对于编码来说清晰

熟悉了这部分的实现,那么对于其他对象类似的功能,我觉得你应该也能有自己的把握了。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-06-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前置知识
  • 心路历程
  • 码前提问
  • 源码分析
    • 寻码过程
      • Deployment 结构
        • DeploymentController
          • DeploymentController 结构
          • NewDeploymentController
          • addDeployment
          • Run
        • syncDeployment
          • rolloutRolling
          • 码后解答
          • 总结提升
            • 设计上
            相关产品与服务
            容器服务
            腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档