前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >go限流组件包rate源码分析

go限流组件包rate源码分析

原创
作者头像
Johns
修改2022-06-30 10:28:39
1.5K0
修改2022-06-30 10:28:39
举报
文章被收录于专栏:代码工具

一. API 介绍

在分析go/rate组件的设计之前,我们需要知道这个组件的功能。rate 见名思义就是用来做"频次控制"的,用的是Token Bucket(令牌桶) 算法实现的。

下面我列举了一下这个组件提供的一些主要的方法:

  • NewLimiter (1)显然这是一个构造方法用于生成一个限流器 (2)构造器主要初始化令牌桶算法中令牌桶的大小burst和令牌生成速率rate。 (3)rate是一个limit类型(float64), 除了直接指定我们还可以通过Every函数来生成。
  • Reserve/ReserveN (1)Reserve 相当于 ReserveN(time.Now(), 1) (2)当使用使用 ReserveN 方法时,不管能不能从令牌桶内获取到Token都会返回一个Reservation对象 (3)Reservation对象的 Ok() 方法返回了令牌桶是否可以在最大等待时间内提供请求的令牌数量,如果OK为false,则Delay()返回InfDuration (4)Reservation对象的 Delay() 方法返回了需要等待的时间,如果时间为0则不需要等待 (5)如果不想等待就调用 Reservation对象的 Cancel() (6)ReserveN 是获取tokens的唯一入口,其他方法也都是间接调用它获取token的
  • Wait/WaitN (1)Wait 实际上就是 WaitN(ctx,1) (2)当使用 Wait 方法时,如果令牌桶内Token(大于or等于 N)直接返回,如果当时令牌桶内 Token 不足(小于 N),那么 Wait 方法将会阻塞,直到能从令牌桶内获得 Token (3) 底层还是调用的Reserve/ReserveN 方法
  • Allow/AllowN (1)Allow 实际上就是 AllowN(time.Now(),1)。 (2)当使用使用 AllowN 方法时,截止到time.Now()这一时刻(time可以自己传入)令牌桶中数目必须(大于or等于 N),满足则返回正确,同时从令牌桶中消费 N 个 Token (3)应用场景请求速度太快就直接丢掉一些请求。 (4)底层还是调用的Reserve/ReserveN 方法
  • SetLimit/SetBurst (1)SetLimit 实际上相当于重新设置了token的生成速率。 (2)SetBurst 相当于重新设置了桶的大小。 (3)SetLimit/SetBurst 重置了整个令牌桶, 用户再借助一些统计信息就可以实现一个动态自适应的限流算法

二. 使用案例

我目前用的版本是go15.6版本, 下面的案例也是官方给出的一个案例。

代码语言:txt
复制
// 使用golang/rate 实现, 令牌桶算法
func TestRateLimitByGoRate1(t *testing.T) {
	ticker := rate.NewLimiter(3, 6)
	length := 20
	chs := make([]chan string, length)
	for i := 0; i < length; i++ {
		chs[i] = make(chan string, 1)
		go func(taskId string, ch chan string, r *rate.Limiter) {
			err := ticker.Wait(context.TODO())
			if err != nil {
				ch <- "Task-" + taskId + " not allow " + time.Now().Format(layout)
			}

			time.Sleep(time.Duration(5) * time.Millisecond)
			ch <- "Task-" + taskId + " run success  " + time.Now().Format(layout)
			return

		}(strconv.FormatInt(int64(i), 10), chs[i], ticker)
	}
	for _, ch := range chs {
		t.Log("task start at " + <-ch)
	}
}

输出结果:

代码语言:txt
复制
=== RUN   TestRateLimitByGoRate1
    rateLimit_test.go:121: task start at Task-0 run success  2021-07-07 16:13:06
    rateLimit_test.go:121: task start at Task-1 run success  2021-07-07 16:13:06
    rateLimit_test.go:121: task start at Task-2 run success  2021-07-07 16:13:07
    rateLimit_test.go:121: task start at Task-3 run success  2021-07-07 16:13:10
    rateLimit_test.go:121: task start at Task-4 run success  2021-07-07 16:13:08
    rateLimit_test.go:121: task start at Task-5 run success  2021-07-07 16:13:08
    rateLimit_test.go:121: task start at Task-6 run success  2021-07-07 16:13:11
    rateLimit_test.go:121: task start at Task-7 run success  2021-07-07 16:13:06
    rateLimit_test.go:121: task start at Task-8 run success  2021-07-07 16:13:06
    rateLimit_test.go:121: task start at Task-9 run success  2021-07-07 16:13:06
    rateLimit_test.go:121: task start at Task-10 run success  2021-07-07 16:13:07
    rateLimit_test.go:121: task start at Task-11 run success  2021-07-07 16:13:09
    rateLimit_test.go:121: task start at Task-12 run success  2021-07-07 16:13:11
    rateLimit_test.go:121: task start at Task-13 run success  2021-07-07 16:13:09
    rateLimit_test.go:121: task start at Task-14 run success  2021-07-07 16:13:07
    rateLimit_test.go:121: task start at Task-15 run success  2021-07-07 16:13:08
    rateLimit_test.go:121: task start at Task-16 run success  2021-07-07 16:13:09
    rateLimit_test.go:121: task start at Task-17 run success  2021-07-07 16:13:10
    rateLimit_test.go:121: task start at Task-18 run success  2021-07-07 16:13:10
    rateLimit_test.go:121: task start at Task-19 run success  2021-07-07 16:13:06
--- PASS: TestRateLimitByGoRate1 (4.67s)
PASS

三. 源码实现

令牌桶算法有2类关键对象,即一个是堵塞队列(BlockingQueue)用来存储令牌对象,一个是定时器(Timer)用于定时生成令牌对象,但是BlockingQueue和Timer实现比较复杂。go官方采用的则是一种lock+计数器的“懒加载”模式的令牌桶算法。lock是为了保护计数器的原子性操作;令牌只有在实际使用的时候才会进行计算也就省去了对堵塞队列和Timer的依赖,降低了整个算法的复杂度。

前面我们就有说过ReserveN 是获取tokens的统一入口, 它是整个组件的最核心方法。它包含了3个参数,分别是预订时间now,需要预定的tokens数n,最大的等待时间maxFutureReserve。当有请求要获取token时,都需要经过这个方法,且不论是否成功,都会返回Reservationd表示预订记录,Reservationd的定义如下:

代码语言:txt
复制
type Reservation struct {
  // 表示是否预订成功
   ok        bool
  // 限流器
   lim       *Limiter
  // 当前tokens数
   tokens    int
  // 需要等待的截止时间
   timeToAct time.Time
   // This is the Limit at reservation time, it can change later.
  // 预订的时间
   limit Limit
}

下面我们具体分析一下ReserveN这个方法的实现部分:

代码语言:txt
复制
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}

  // step1. 计算截止当前时间now, 桶里面还有多少token。
	now, last, tokens := lim.advance(now)

  // step2. 计算还需要生成多少token才够n个token
	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)
	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
    // 生成tokens个令牌需要等待多长时间
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

  // step3. 如果等待时间超过了用户的maxFutureReserve,则表示预订失败,直接返回
  // 否则返回预定成功,并告知用户交货的时间timeToAct。
	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

可以看到算法基本思路如下:

Step 1:计算截止当前时间now, 实时计算桶里面还有多少token。可以看到lim.advance的实现:
代码语言:txt
复制
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
	last := lim.last
	if now.Before(last) {
		last = now
	}
	// Avoid making delta overflow below when last is very old.
	maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
	elapsed := now.Sub(last)
	if elapsed > maxElapsed {
		elapsed = maxElapsed
	}

	// Calculate the new number of tokens, due to time that passed.
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}

  // 返回当前时间, 上一次领取时间,当前桶里面已经有的tokens
	return now, last, tokens
}

durationFromTokens用于计算把桶装满的需要的时间maxElapsed,如果时间now距离上一次领取token的时间last的时间elapsed大于maxElapsed,我们取maxElapsed,否则就取elapsed, 这一步主要为了防止桶溢出。

tokensFromDuration用于计算elapsed时间里新增的token数,为了避免float64精度问题这里分别对在秒和纳秒位上进行一次计算,并进行汇总。

代码语言:txt
复制
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	// Split the integer and fractional parts ourself to minimize rounding errors.
	// See golang.org/issues/34861.
	sec := float64(d/time.Second) * float64(limit)
	nsec := float64(d%time.Second) * float64(limit)
	return sec + nsec/1e9
}
Step 2:基于step1我们知道了还需要生成多少token才够n个token, 这一步我们就需要计算生成剩余token需要多长时waitDuration。
Step3: 封装预订的结果信息,包含3种可能:
  • 如果tokens在第一步足量,那么直接返回预订状态ok=true,等待截止时间timeToAct=0
  • 如果tokens在第一步不足,那么分2中情况:
    • 如果计算需要等待的时间waitDuration>maxFutureReserve, 那么预订状态ok=false
    • 如果计算需要等待的时间waitDuration<maxFutureReserve, 那么预订状态ok=true, timeToAct=now+waitDuration

基于ReserveN方法,我们可以衍生出多种不同方法。比如Allow/AllowN就是快速查询桶里面的token是否足量, 方法不会堵塞当前goroutine;Wait/WaitN 判断获取n个token需要等待多久,会堵塞当前goroutine。如果想自己根据预订信息处理,则直接使用Reserve/ReserveN方法就行了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. API 介绍
  • 二. 使用案例
  • 三. 源码实现
    • Step 1:计算截止当前时间now, 实时计算桶里面还有多少token。可以看到lim.advance的实现:
      • Step 2:基于step1我们知道了还需要生成多少token才够n个token, 这一步我们就需要计算生成剩余token需要多长时waitDuration。
        • Step3: 封装预订的结果信息,包含3种可能:
        相关产品与服务
        流计算 Oceanus
        流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档