前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >系统库golang.org/x/time/rate 限频器bug

系统库golang.org/x/time/rate 限频器bug

原创
作者头像
wish42
发布于 2021-10-19 08:54:19
发布于 2021-10-19 08:54:19
9620
举报
文章被收录于专栏:Go fasterGo faster

背景

最近在使用限频器时发现golang辅助系统库中的限频器有bug,分享出来与大家一起探讨一下。

测试代码:

代码语言:txt
AI代码解释
复制
package main

import (
	"fmt"
	"sync/atomic"
	"time"
	"golang.org/x/time/rate"
)

func main() {
	var succCount, failCount int64
	limit := rate.Every(100 * time.Millisecond)
	burst := 1
	limiter := rate.NewLimiter(limit, burst)
	start := time.Now()
	for i := 0; i < 5000; i++ {
		go func() {
			for {
				if limiter.Allow() {
					atomic.AddInt64(&succCount, 1)
				} else {
					atomic.AddInt64(&failCount, 1)
				}
			}
		}()
	}
	time.Sleep(2 * time.Second)
	elapsed := time.Since(start)
	fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}

输出:

代码语言:txt
AI代码解释
复制
elapsed= 2.010675962s succCount= 24849 failCount= 6894827

使用的go版本:

代码语言:txt
AI代码解释
复制
go version go1.16.2 darwin/amd64

从上例可以看出,设置的qps是每秒通过10个请求,但在多协程并发场景下2s的时间段内竟然通过了24849个请求。在trpc服务场景中使用时每个请求也都会开一个协程进行业务逻辑处理,那在这种场景下岂不是就bug了。

原因

我们深入代码看一下:

看time/rate的源码,Allow函数的实现只是AllowN(time.Now(), 1)的便捷实现:

代码语言:txt
AI代码解释
复制
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
	return lim.AllowN(time.Now(), 1)
}

AllowN又调用了reserveN方法:

代码语言:txt
AI代码解释
复制
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0).ok
}

reserveN的实现就很有意思了,

代码语言:txt
AI代码解释
复制
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}

	now, last, tokens := lim.advance(now)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

这里面比较重要的方法是advance的实现:

代码语言:txt
AI代码解释
复制
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
	last := lim.last
	if now.Before(last) {
		last = now
	}

	// Avoid making delta overflow below when last is very old.
	maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
	elapsed := now.Sub(last)
	if elapsed > maxElapsed {
		elapsed = maxElapsed
	}

	// Calculate the new number of tokens, due to time that passed.
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}

	return now, last, tokens
}

这个函数中返回了三个参数newNow,newLast,newTokens。

从代码中可以看出第一个参数newNow其实完全就是传入的入参now直接返回了,所以这个第一个返回值其实是没有必要的;

第二个参数,是返回上次tokens被更新的时间点,如果当前传入的时间点是在上次更新的时间点之前的话同样会返回当前传入的时间点;

第三个参数newTokens是根据当前的时间点与上次更新的时间点之间的流逝时间转换成token数量进行返回。

让我们结合本人加上的中文注释再回头看一下reserveN的实现逻辑:

代码语言:txt
AI代码解释
复制
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
    // 加个锁先,这个好理解
	lim.mu.Lock()
    
    // 判断limit是否为无限大,直接返回ok
	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
    // 通过advance函数获取到now这个时间点可以用的token数量
	now, last, tokens := lim.advance(now)
    
	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}
    // 更新状态这里,如果ok了就更新当前的时间点以及需要更新的字段,但如果不ok的话为什么需要更新last字段呢
	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

上面代码注释中已经指出了如果获取不ok的话,这里只更新了lim中的last字段,我们来考证一下:

代码语言:txt
AI代码解释
复制
type Limiter struct {
	limit Limit
	burst int

	mu     sync.Mutex
	tokens float64
	// last is the last time the limiter's tokens field was updated
	last time.Time
	// lastEvent is the latest time of a rate-limited event (past or future)
	lastEvent time.Time
}

这里的注释写的很清楚,last是上面的tokens字段更新的时间点。所以上面reserveN中更新last字段的操作显得很迷。

所以这里改一下系统库的代码验证一下,将reserveN的方法修改一下:

代码语言:txt
AI代码解释
复制
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
    
    // 这里忽略advance返回的第二个字段
	now, _, tokens := lim.advance(now)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	}
	// 这里如果不ok的话不执行任何操作

	lim.mu.Unlock()
	return r
}

修改点在注释里已经标明了,让上面的测试代码依赖修改后的库代码重新执行可得:

代码语言:txt
AI代码解释
复制
package main

import (
	"fmt"
	"sync/atomic"
	"time"
	//"golang.org/x/time/rate"

	rate "git.code.oa.com/gcd/go-utils/comm/trate"
)

func main() {
	var succCount, failCount int64
	limit := rate.Every(100 * time.Millisecond)
	burst := 1
	limiter := rate.NewLimiter(limit, burst)
	start := time.Now()
	for i := 0; i < 5000; i++ {
		go func() {
			for {
				if limiter.Allow() {
					atomic.AddInt64(&succCount, 1)
				} else {
					atomic.AddInt64(&failCount, 1)
				}
			}
		}()
	}
	time.Sleep(2 * time.Second)
	elapsed := time.Since(start)
	fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}
代码语言:txt
AI代码解释
复制
elapsed= 2.009816654s succCount= 21 failCount= 7513617

进展

为此在GitHub上为系统库提了一个issue,发现早在2017年的时候就有人发现了这个问题并为此提了fix的建议,但一直没有被合并到master。

总结

大家如果使用了这个限频器的话,注意避坑。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
golang源码分析:time/rate
底层都是调用了reserveN函数,maxFutureReserve参数传的是0
golangLeetcode
2022/08/02
5320
Go微服务--令牌桶实现原理
在上一篇文章 Go微服务: 令牌桶 当中简单的介绍了令牌桶实现的原理,然后利用 /x/time/rate 这个库 10 行代码写了一个基于 ip 的 gin 限流中间件,那这个功能是怎么实现的呢?接下来我们就从源码层面来了解一下这个库的实现。这个实现很有意思,并没有真正的使用一个定时器不断的生成令牌,而是靠计算的方式来完成
冬夜先生
2021/09/03
4350
go限流组件包rate源码分析
在分析go/rate组件的设计之前,我们需要知道这个组件的功能。rate 见名思义就是用来做"频次控制"的,用的是Token Bucket(令牌桶) 算法实现的。
Johns
2021/07/19
1.5K0
go限流组件包rate源码分析
golang 标准库 time/rate 介绍
golang官方库中有一个rate包,实现了令牌桶算法。仓库地址:https://github.com/golang/time
ppxai
2020/09/23
1K0
Golang(七)golang.org/x/time/rate 实现频率限制
1. 源码阅读 整个包实现原理基于令牌桶算法:随时间以 1/r 个令牌的速度向容积为 b 个令牌的桶中添加令牌,有请求就取走令牌,若令牌不足则不执行请求或者等待 Allow 方法的调用链:lim.Allow() bool → lim.AllowN(time.Now(), 1) → lim.reserveN(now, n, 0).ok,因此 reserveN 方法的实现很关键 // Allow is shorthand for AllowN(time.Now(), 1). func (lim *Limiter
西凉风雷
2022/11/23
4260
高并发系统的限流策略:漏桶和令牌桶(附源码剖析)
漏桶算法比较好理解,假设我们现在有一个水桶,我们向这个水桶里添水,虽然我们我们无法预计一次会添多少水,也无法预计水流入的速度,但是可以固定出水的速度,不论添水的速率有多大,都按照固定的速率流出,如果桶满了,溢出的上方水直接抛弃。我们把水当作HTTP请求,每次都把请求放到一个桶中,然后以固定的速率处理请求,说了这么多,不如看一个图加深理解(图片来自于网络,手残党不会画,多多包涵):
Golang梦工厂
2022/07/08
1K0
高并发系统的限流策略:漏桶和令牌桶(附源码剖析)
Golang 限流器(1) - Golang 标准库限流器
上图可以看出 client-go 用到了 workqueue 队列 来处理 从 DeltaFIFO pop 出来的内容,workqueue 队列用到了限流队列(微服务中常用的技术,防止性能过载,从而导致任务处理失败)。
后端云
2023/02/10
3810
Golang 限流器(1) - Golang 标准库限流器
Go 基于令牌桶的限流器
如果一般流量过大,下游系统反应不过来,这个时候就需要限流了,其实和上地铁是一样的,就是减慢上游访问下游的速度。
王小明_HIT
2021/11/12
4.3K0
Golang 标准库 限流器 time/rate 设计与实现
友情提示:此篇文章大约需要阅读 12分钟7秒,不足之处请多指教,感谢你的阅读。
Meng小羽
2020/08/25
2.4K0
Golang限流器time/rate正确打开姿势
今天聊一下go语言限流工具的 golang.org/x/time/rate 包下Limiter的用法
每周聚焦
2024/09/30
1340
Golang限流器time/rate正确打开姿势
Golang官方限流器的用法详解
限流器是提升服务稳定性的非常重要的组件,可以用来限制请求速率,保护服务,以免服务过载。限流器的实现方法有很多种,常见的限流算法有固定窗口、滑动窗口、漏桶、令牌桶,我在前面的文章 「常用限流算法的应用场景和实现原理」 中给大家讲解了这几种限流方法自身的特点和应用场景,其中令牌桶在限流的同时还可以应对一定的突发流量,与互联网应用容易因为热点事件出现突发流量高峰的特点更契合。
KevinYan
2021/07/15
6.3K0
golang 令牌桶限流器 rate
令牌桶算法(Token Bucket)随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token,如果桶已经满了就不再加了。新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.
七点一刻
2022/06/14
6940
golang 令牌桶限流器 rate
Kubernetes 源码学习之限速队列
前面我们详细分析了 client-go 中的延迟队列的实现,接下来就是限速队列的实现,限速队列在我们日常应用中非常广泛,其原理也比较简单,利用延迟队列的特性,延迟某个元素的插入时间来达到限速的目的。
我是阳明
2020/09/30
3.4K0
Kubernetes 源码学习之限速队列
限速器算法
这种方式可以缓解突发流量对系统的影响,缺点是在流量突发时,由于队列中缓存了旧的请求,导致无法处理新的请求。而且也无法保证请求能够在一定时间内处理完毕。 令牌桶不会缓存请求,它通过颁发令牌的方式来允许请求,因此它存在和漏桶算法一样的问题。
charlieroro
2023/12/28
3060
限速器算法
go每日一库 [go-rate] 速率限制器
go-rate是速率限制器库,基于 Token Bucket(令牌桶)算法实现。 go-rate被用在LangTrend的生产中 用于遵守GitHub API速率限制。
happlyfox
2021/04/05
5.1K0
后端服务不得不了解之限流
现在说到高可用系统,都会说到高可用的保护手段:缓存、降级和限流,本博文就主要说说限流。限流是流量限速(Rate Limit)的简称,是指只允许指定的事件进入系统,超过的部分将被拒绝服务、排队或等待、降级等处理。
好好学java
2021/02/07
1.8K0
Go微服务--令牌桶
举个例子:假设我们桶的容量是 100,速度是 10 rps,那么在我们桶满的情况下,如果突然来 100 个请求是可以满足的,但是后续的请求就会被限制到 10 rps
冬夜先生
2021/09/03
7340
go-zero 是如何实现令牌桶限流的?
上一篇文章介绍了 如何实现计数器限流?主要有两种实现方式,分别是固定窗口和滑动窗口,并且分析了 go-zero 采用固定窗口方式实现的源码。
AlwaysBeta
2023/08/10
8480
聊聊限流器TokenBucket的基本原理及实现
大家好,我渔夫子。上篇文章我们讲解了漏桶(LeakyBucket)的实现原理。本文我们介绍另外一种限流器---令牌桶(TokenBucket)。
Go学堂
2023/01/31
1.2K0
Golang 限流器(2) - beefsack个人的开源 限流器
加注释,换行,不到100行代码的限流包。非常好理解,没有用令牌桶,所以代码非常简单,行数很少。
后端云
2023/02/10
2650
Golang 限流器(2) - beefsack个人的开源 限流器
相关推荐
golang源码分析:time/rate
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档