为了保证软件的稳定性,请求重试是网络编程中必须要考虑到的情况之一。Kubernetes作为一款基于网络的分布式软件,对网络重试有着非常完备的考量和实践。本文通过Kubernetes中reflector机制的实现,带领大家深入Kubernetes的世界,看看开源世界最优秀的软件之一Kubernetes,是如何进行请求重试的。
Reflector是Kubernetes中client-go库的一个核心组件,主要负责监视(Watch)特定类型的Kubernetes资源对象,并将所有变化反映(Reflect)到本地缓存(Store)中。
其中最为核心的函数就是:
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error
这个函数有两个主要职责:
首先通过List操作获取资源对象的完整列表,对应着函数
func (r *Reflector) list(stopCh <-chan struct{}) error
然后建立Watch连接来监听对象的变更事件,对应着函数
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error
对于Reflector
的讲解不在展开,有兴趣的读者可以根据上面的介绍自行去代码库中查看,截止2024年12月8日,Reflector
的相关代码存放在Kubernetes代码库的staging/src/k8s.io/client-go/tools/cache/reflector.go
中。
接下来,我们深入Reflector
的具体实现,剖析Reflector
是如何实现优雅,合理的重试机制,以及我们如何将这些经验应用到自己的项目中去。
事实上并不是所有的网络错误都是需要重试的。在Kubernetes的Reflector
中,大致有以下几种情况需要进行重试:
utilnet.IsConnectionRefused(err)
apierrors.IsTooManyRequests(err)
apierrors.IsInternalError(err) && retry.ShouldRetry()
针对不同的情况Kuberernes展示了不同的重试方式:
对于网络连接错误和服务器负载过大的情况,Kubernetes采用了完全相同的处理方式,下面的代码反映了处理这种情况的核心模型:
case <-stopCh:
return nil
case <-r.backoffManager.Backoff().C():
continue
其中stopCh是一个只读管道,上层调用栈会通过stopCh来控制下层调用栈中的操作,当上层调用栈向stopCh中写入数据时,会强制结束下层操作(即 return nil
)。
而r.backoffManager.Backoff().C()
返回的是同样是一个只读管道,具体定义如下:
func (b *backoffManager) Backoff() clock.Timer {
b.lock.Lock()
defer b.lock.Unlock()
if b.timer == nil {
b.timer = b.clock.NewTimer(b.Step())
} else {
b.timer.Reset(b.Step())
}
return b.timer
}
这里的clock.Timer
是对time.Timer
的封装。<-r.backoffManager.Backoff().C()
其实就是通过阻塞当前协程,等待b.Step()
中设置的时间间隔,到期之后触发后续的continue操作,也就是重试。
在这里面需要特别注意的是:b.Step()
采用的是逐渐增大的重试间隔,数学公式如下,对于第 n 次重试,等待时间 $T_n$ 的计算公式为:
initial\_interval : 初始等待时间
factor : 指数因子(通常为2)
n : 重试次数(从0开始)
cap : 最大等待时间
\delta_n : 随机抖动值`
简单来说,随着重试失败次数增多,重试之间的时间间隔也会变长,防止频繁重试对服务造成额外负载,导致恶性循环。
为了方便大家理解,我总结提炼了这种重试策略,以一个小程序的形式,实现了这个这个策略,方便大家进行理解。
package main
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
"k8s.io/utils/clock"
)
// BackoffManager defines the retry management interface.
type BackoffManager interface {
Backoff() clock.Timer
}
// backoffManager implements retry management.
type backoffManager struct {
duration time.Duration // Initial retry interval
factor float64 // Exponential backoff factor
jitter float64 // Jitter factor
steps int // Maximum retry attempts (not used in this example)
cap time.Duration // Maximum retry interval
lock sync.Mutex
timer clock.Timer
clock clock.Clock
}
// NewBackoffManager creates a new retry manager.
func NewBackoffManager(initial time.Duration, maxInterval time.Duration, factor float64, jitter float64, clock clock.Clock) BackoffManager {
return &backoffManager{
duration: initial,
factor: factor,
jitter: jitter,
cap: maxInterval,
clock: clock,
}
}
// Backoff implements the retry logic.
func (b *backoffManager) Backoff() clock.Timer {
b.lock.Lock()
defer b.lock.Unlock()
duration := b.duration
if b.jitter > 0.0 {
duration = addJitter(duration, b.jitter)
}
if b.factor > 0.0 {
b.duration = time.Duration(float64(b.duration) * b.factor)
if b.cap > 0 && b.duration > b.cap {
b.duration = b.cap
}
}
if b.timer == nil {
b.timer = b.clock.NewTimer(duration)
} else {
b.timer.Reset(duration)
}
return b.timer
}
func addJitter(duration time.Duration, jitter float64) time.Duration {
multiplier := 1.0 + rand.Float64()*jitter
return time.Duration(float64(duration) * multiplier)
}
func unstableOperation() error {
// Simulate a randomly failing operation.
if rand.Float64() < 0.7 {
return errors.New("service temporarily unavailable")
}
return nil
}
func main() {
stopCh := make(chan struct{})
backoffMgr := NewBackoffManager(
100*time.Millisecond, // Initial retry interval
5*time.Second, // Maximum retry interval
2.0, // Exponential factor
0.1, // Jitter factor
clock.RealClock{}, // Use real clock
)
go func() {
time.Sleep(10 * time.Second)
close(stopCh)
}()
attempts := 0
maxAttempts := 10
fmt.Println("Starting retry operation...")
for attempts < maxAttempts {
attempts++
err := unstableOperation()
if err == nil {
fmt.Printf("Operation successful! Retried %d times\n", attempts-1)
return
}
fmt.Printf("Attempt %d failed: %v\n", attempts, err)
// Implement retry logic
select {
case <-stopCh:
fmt.Println("Stop signal received, exiting retry loop")
return
case <-backoffMgr.Backoff().C():
fmt.Println("Continuing retry after backoff...")
continue
}
}
fmt.Printf("Reached maximum retry attempts %d, operation ultimately failed\n", maxAttempts)
}
程序中规定服务器出现错误的概率是70%。当出现重试时会打印如下日志:
Starting retry operation...
Attempt 1 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 2 failed: service temporarily unavailable
Continuing retry after backoff...
Operation successful! Retried 2 times
如果超过10s的最大的时长限制,主流程中启动的协程会强制关闭stopCh,结束请求访问。
go func() {
time.Sleep(10 * time.Second)
close(stopCh)
}()
Starting retry operation...
Attempt 1 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 2 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 3 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 4 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 5 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 6 failed: service temporarily unavailable
Continuing retry after backoff...
Attempt 7 failed: service temporarily unavailable
Stop signal received, exiting retry loop
对于服务器内部错误,可以理解为请求虽然被服务器正确接收和处理,但因为意外情况,返回内部错误。此时,服务器的负载可能并不大,不需要有意的控制重试的时间间隔,但是也不能频繁重试,这可能会导致服务器负载加大,此时Kubernetes采用的策略是:直接通过时间窗口判断是否重试。
func (r *retryWithDeadlineImpl) ShouldRetry() bool {
if r.maxRetryDuration <= time.Duration(0) {
return false
}
if r.clock.Now().Sub(r.firstErrorTime) <= r.maxRetryDuration {
return true
}
r.reset()
return false
}
上面函数是判断是否重试的判断逻辑,简单来说就是判断当前时间距离第一次错误时间是否超过maxRetryDuration
,如果超过则不重试,没超过,则直接进行重试,可以说非常简单粗暴。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。