在分析go/rate组件的设计之前,我们需要知道这个组件的功能。rate 见名思义就是用来做"频次控制"的,用的是Token Bucket(令牌桶) 算法实现的。
下面我列举了一下这个组件提供的一些主要的方法:
我目前用的版本是go15.6版本, 下面的案例也是官方给出的一个案例。
// 使用golang/rate 实现, 令牌桶算法
func TestRateLimitByGoRate1(t *testing.T) {
ticker := rate.NewLimiter(3, 6)
length := 20
chs := make([]chan string, length)
for i := 0; i < length; i++ {
chs[i] = make(chan string, 1)
go func(taskId string, ch chan string, r *rate.Limiter) {
err := ticker.Wait(context.TODO())
if err != nil {
ch <- "Task-" + taskId + " not allow " + time.Now().Format(layout)
}
time.Sleep(time.Duration(5) * time.Millisecond)
ch <- "Task-" + taskId + " run success " + time.Now().Format(layout)
return
}(strconv.FormatInt(int64(i), 10), chs[i], ticker)
}
for _, ch := range chs {
t.Log("task start at " + <-ch)
}
}
输出结果:
=== RUN TestRateLimitByGoRate1
rateLimit_test.go:121: task start at Task-0 run success 2021-07-07 16:13:06
rateLimit_test.go:121: task start at Task-1 run success 2021-07-07 16:13:06
rateLimit_test.go:121: task start at Task-2 run success 2021-07-07 16:13:07
rateLimit_test.go:121: task start at Task-3 run success 2021-07-07 16:13:10
rateLimit_test.go:121: task start at Task-4 run success 2021-07-07 16:13:08
rateLimit_test.go:121: task start at Task-5 run success 2021-07-07 16:13:08
rateLimit_test.go:121: task start at Task-6 run success 2021-07-07 16:13:11
rateLimit_test.go:121: task start at Task-7 run success 2021-07-07 16:13:06
rateLimit_test.go:121: task start at Task-8 run success 2021-07-07 16:13:06
rateLimit_test.go:121: task start at Task-9 run success 2021-07-07 16:13:06
rateLimit_test.go:121: task start at Task-10 run success 2021-07-07 16:13:07
rateLimit_test.go:121: task start at Task-11 run success 2021-07-07 16:13:09
rateLimit_test.go:121: task start at Task-12 run success 2021-07-07 16:13:11
rateLimit_test.go:121: task start at Task-13 run success 2021-07-07 16:13:09
rateLimit_test.go:121: task start at Task-14 run success 2021-07-07 16:13:07
rateLimit_test.go:121: task start at Task-15 run success 2021-07-07 16:13:08
rateLimit_test.go:121: task start at Task-16 run success 2021-07-07 16:13:09
rateLimit_test.go:121: task start at Task-17 run success 2021-07-07 16:13:10
rateLimit_test.go:121: task start at Task-18 run success 2021-07-07 16:13:10
rateLimit_test.go:121: task start at Task-19 run success 2021-07-07 16:13:06
--- PASS: TestRateLimitByGoRate1 (4.67s)
PASS
令牌桶算法有2类关键对象,即一个是堵塞队列(BlockingQueue)用来存储令牌对象,一个是定时器(Timer)用于定时生成令牌对象,但是BlockingQueue和Timer实现比较复杂。go官方采用的则是一种lock+计数器的“懒加载”模式的令牌桶算法。lock是为了保护计数器的原子性操作;令牌只有在实际使用的时候才会进行计算也就省去了对堵塞队列和Timer的依赖,降低了整个算法的复杂度。
前面我们就有说过ReserveN 是获取tokens的统一入口, 它是整个组件的最核心方法。它包含了3个参数,分别是预订时间now,需要预定的tokens数n,最大的等待时间maxFutureReserve。当有请求要获取token时,都需要经过这个方法,且不论是否成功,都会返回Reservationd表示预订记录,Reservationd的定义如下:
type Reservation struct {
// 表示是否预订成功
ok bool
// 限流器
lim *Limiter
// 当前tokens数
tokens int
// 需要等待的截止时间
timeToAct time.Time
// This is the Limit at reservation time, it can change later.
// 预订的时间
limit Limit
}
下面我们具体分析一下ReserveN这个方法的实现部分:
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// step1. 计算截止当前时间now, 桶里面还有多少token。
now, last, tokens := lim.advance(now)
// step2. 计算还需要生成多少token才够n个token
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
// 生成tokens个令牌需要等待多长时间
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// step3. 如果等待时间超过了用户的maxFutureReserve,则表示预订失败,直接返回
// 否则返回预定成功,并告知用户交货的时间timeToAct。
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
可以看到算法基本思路如下:
lim.advance
的实现:func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
// 返回当前时间, 上一次领取时间,当前桶里面已经有的tokens
return now, last, tokens
}
durationFromTokens用于计算把桶装满的需要的时间maxElapsed,如果时间now距离上一次领取token的时间last的时间elapsed大于maxElapsed,我们取maxElapsed,否则就取elapsed, 这一步主要为了防止桶溢出。
tokensFromDuration用于计算elapsed时间里新增的token数,为了避免float64精度问题这里分别对在秒和纳秒位上进行一次计算,并进行汇总。
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
// Split the integer and fractional parts ourself to minimize rounding errors.
// See golang.org/issues/34861.
sec := float64(d/time.Second) * float64(limit)
nsec := float64(d%time.Second) * float64(limit)
return sec + nsec/1e9
}
基于ReserveN方法,我们可以衍生出多种不同方法。比如Allow/AllowN就是快速查询桶里面的token是否足量, 方法不会堵塞当前goroutine;Wait/WaitN 判断获取n个token需要等待多久,会堵塞当前goroutine。如果想自己根据预订信息处理,则直接使用Reserve/ReserveN方法就行了。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。