首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kubernetes源码学习笔记------如何实现优雅重试

Kubernetes源码学习笔记------如何实现优雅重试

原创
作者头像
北京柴道
发布2024-12-08 19:12:40
发布2024-12-08 19:12:40
2680
举报

重试机制

为了保证软件的稳定性,请求重试是网络编程中必须要考虑到的情况之一。Kubernetes作为一款基于网络的分布式软件,对网络重试有着非常完备的考量和实践。本文通过Kubernetes中reflector机制的实现,带领大家深入Kubernetes的世界,看看开源世界最优秀的软件之一Kubernetes,是如何进行请求重试的。

reflector简介

Reflector是Kubernetes中client-go库的一个核心组件,主要负责监视(Watch)特定类型的Kubernetes资源对象,并将所有变化反映(Reflect)到本地缓存(Store)中。

其中最为核心的函数就是:

代码语言:go
复制
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error

这个函数有两个主要职责:

首先通过List操作获取资源对象的完整列表,对应着函数

代码语言:go
复制
func (r *Reflector) list(stopCh <-chan struct{}) error

然后建立Watch连接来监听对象的变更事件,对应着函数

代码语言:go
复制
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error

对于Reflector的讲解不在展开,有兴趣的读者可以根据上面的介绍自行去代码库中查看,截止2024年12月8日,Reflector的相关代码存放在Kubernetes代码库的staging/src/k8s.io/client-go/tools/cache/reflector.go中。

重试机制实现

接下来,我们深入Reflector的具体实现,剖析Reflector是如何实现优雅,合理的重试机制,以及我们如何将这些经验应用到自己的项目中去。

When?什么时候进行重试?

事实上并不是所有的网络错误都是需要重试的。在Kubernetes的Reflector中,大致有以下几种情况需要进行重试:

  1. 网络连接问题,在这里指服务器因为网络或负载原因拒绝了网络连接。
  2. 服务器负载过大,在这里服务器会通过http状态码(429)主动告知请求发起方,负载过大。
  3. 服务器内部错误,同样由服务器直接通过http状态码直接告知发起方。
代码语言:go
复制
utilnet.IsConnectionRefused(err) 
代码语言:go
复制
apierrors.IsTooManyRequests(err)
代码语言:go
复制
apierrors.IsInternalError(err) && retry.ShouldRetry()

How?如何进行请求重试

针对不同的情况Kuberernes展示了不同的重试方式:

网络连接和负载过大的问题:

对于网络连接错误和服务器负载过大的情况,Kubernetes采用了完全相同的处理方式,下面的代码反映了处理这种情况的核心模型:

代码语言:go
复制
case <-stopCh:
    return nil
case <-r.backoffManager.Backoff().C():
    continue

其中stopCh是一个只读管道,上层调用栈会通过stopCh来控制下层调用栈中的操作,当上层调用栈向stopCh中写入数据时,会强制结束下层操作(即 return nil)。

r.backoffManager.Backoff().C()返回的是同样是一个只读管道,具体定义如下:

代码语言:go
复制
func (b *backoffManager) Backoff() clock.Timer {
	b.lock.Lock()
	defer b.lock.Unlock()
	if b.timer == nil {
		b.timer = b.clock.NewTimer(b.Step())
	} else {
		b.timer.Reset(b.Step())
	}
	return b.timer
}

这里的clock.Timer是对time.Timer的封装。<-r.backoffManager.Backoff().C()其实就是通过阻塞当前协程,等待b.Step()中设置的时间间隔,到期之后触发后续的continue操作,也就是重试。

在这里面需要特别注意的是:b.Step() 采用的是逐渐增大的重试间隔,数学公式如下,对于第 n 次重试,等待时间 $T_n$ 的计算公式为:

T_n = \min(cap, initial\_interval \cdot factor^n + \delta_n)

initial\_interval : 初始等待时间

factor : 指数因子(通常为2)

n : 重试次数(从0开始)

cap : 最大等待时间

\delta_n : 随机抖动值`

简单来说,随着重试失败次数增多,重试之间的时间间隔也会变长,防止频繁重试对服务造成额外负载,导致恶性循环。

为了方便大家理解,我总结提炼了这种重试策略,以一个小程序的形式,实现了这个这个策略,方便大家进行理解。

代码语言:go
复制
package main

import (
	"errors"
	"fmt"
	"math/rand"
	"sync"
	"time"

	"k8s.io/utils/clock"
)

// BackoffManager defines the retry management interface.
type BackoffManager interface {
	Backoff() clock.Timer
}

// backoffManager implements retry management.
type backoffManager struct {
	duration time.Duration // Initial retry interval
	factor   float64       // Exponential backoff factor
	jitter   float64       // Jitter factor
	steps    int           // Maximum retry attempts (not used in this example)
	cap      time.Duration // Maximum retry interval

	lock  sync.Mutex
	timer clock.Timer
	clock clock.Clock
}

// NewBackoffManager creates a new retry manager.
func NewBackoffManager(initial time.Duration, maxInterval time.Duration, factor float64, jitter float64, clock clock.Clock) BackoffManager {
	return &backoffManager{
		duration: initial,
		factor:   factor,
		jitter:   jitter,
		cap:      maxInterval,
		clock:    clock,
	}
}

// Backoff implements the retry logic.
func (b *backoffManager) Backoff() clock.Timer {
	b.lock.Lock()
	defer b.lock.Unlock()

	duration := b.duration
	if b.jitter > 0.0 {
		duration = addJitter(duration, b.jitter)
	}

	if b.factor > 0.0 {
		b.duration = time.Duration(float64(b.duration) * b.factor)
		if b.cap > 0 && b.duration > b.cap {
			b.duration = b.cap
		}
	}

	if b.timer == nil {
		b.timer = b.clock.NewTimer(duration)
	} else {
		b.timer.Reset(duration)
	}
	return b.timer
}

func addJitter(duration time.Duration, jitter float64) time.Duration {
	multiplier := 1.0 + rand.Float64()*jitter
	return time.Duration(float64(duration) * multiplier)
}

func unstableOperation() error {
	// Simulate a randomly failing operation.
	if rand.Float64() < 0.7 {
		return errors.New("service temporarily unavailable")
	}
	return nil
}

func main() {

	stopCh := make(chan struct{})

	backoffMgr := NewBackoffManager(
		100*time.Millisecond, // Initial retry interval
		5*time.Second,        // Maximum retry interval
		2.0,                  // Exponential factor
		0.1,                  // Jitter factor
		clock.RealClock{},    // Use real clock
	)

	go func() {
		time.Sleep(10 * time.Second)
		close(stopCh)
	}()

	attempts := 0
	maxAttempts := 10

	fmt.Println("Starting retry operation...")
	for attempts < maxAttempts {
		attempts++
		err := unstableOperation()
		if err == nil {
			fmt.Printf("Operation successful! Retried %d times\n", attempts-1)
			return
		}

		fmt.Printf("Attempt %d failed: %v\n", attempts, err)

		// Implement retry logic
		select {
		case <-stopCh:
			fmt.Println("Stop signal received, exiting retry loop")
			return
		case <-backoffMgr.Backoff().C():
			fmt.Println("Continuing retry after backoff...")
			continue
		}
	}

	fmt.Printf("Reached maximum retry attempts %d, operation ultimately failed\n", maxAttempts)
}

程序中规定服务器出现错误的概率是70%。当出现重试时会打印如下日志:

代码语言:txt
复制
Starting retry operation...
Attempt 1 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 2 failed: service temporarily unavailable
Continuing retry after backoff...
Operation successful! Retried 2 times

如果超过10s的最大的时长限制,主流程中启动的协程会强制关闭stopCh,结束请求访问。

代码语言:go
复制
go func() {
    time.Sleep(10 * time.Second)
    close(stopCh)
}()
代码语言:txt
复制
Starting retry operation...
Attempt 1 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 2 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 3 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 4 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 5 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 6 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 7 failed: service temporarily unavailable
Stop signal received, exiting retry loop

服务器内部错误

对于服务器内部错误,可以理解为请求虽然被服务器正确接收和处理,但因为意外情况,返回内部错误。此时,服务器的负载可能并不大,不需要有意的控制重试的时间间隔,但是也不能频繁重试,这可能会导致服务器负载加大,此时Kubernetes采用的策略是:直接通过时间窗口判断是否重试。

代码语言:go
复制
func (r *retryWithDeadlineImpl) ShouldRetry() bool {
	if r.maxRetryDuration <= time.Duration(0) {
		return false
	}

	if r.clock.Now().Sub(r.firstErrorTime) <= r.maxRetryDuration {
		return true
	}

	r.reset()
	return false
}

上面函数是判断是否重试的判断逻辑,简单来说就是判断当前时间距离第一次错误时间是否超过maxRetryDuration,如果超过则不重试,没超过,则直接进行重试,可以说非常简单粗暴。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 重试机制
  • reflector简介
  • 重试机制实现
    • When?什么时候进行重试?
    • How?如何进行请求重试
      • 网络连接和负载过大的问题:
      • 服务器内部错误
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档