最近在参与一个业务迁移的项目。走读代码时,接触到一些限流相关的代码。向老司机请教后了解到,有些业务承载了很高量级的扣款请求,尤其对于一些热点商户,其单点的请求量很大,但某些瓶颈系统的处理能力有限,因此需要做好限流,以保障业务流程中各系统的稳定性。
在走读原有代码时,发现对限流机制做过一次优化。原先的限流机制简单描述为:通过tair计数器来限流,当请求过来时获取资源,计数器+1;业务完成后释放资源,计数器-1;当计数器达到限流阈值时,拒绝这次请求。
理论上,这种机制能够达到限流目的。但如果在业务处理过程中,被某些系统流程 (如日志打印)阻塞了业务线程的正常执行,会导致tair计数器在释放资源时超时失败。由此导致tair计数器数值一直处于高位,真正能通过的请求就寥寥无几了。最终导致业务请求的成功率下跌。
为了根治这个问题,需要对限流机制做优化。tair计数器限流的根本问题在于,有获取和释放资源两步,不能保证获取之后一定能成功释放。需要一个不需要释放资源的限流机制来弥补这一缺陷。令牌桶算法就是一个很好的选择。
想象有一座城堡,入口是一道城门,城外的人必须在城门口获得许可才能进入。为了保证城堡的安全,把守城门的卫兵需要控制单位时间进入城门的人口数量。卫兵的做法是这样的:在城门口放一个桶,桶里有一些令牌,只有拿到令牌的人才能够通过。卫兵每隔一个小时就往桶里扔100个令牌,并且桶最多能容纳100个令牌,如果桶满了就不会再往里扔令牌了。这样一来,平均每小时最多只有100人能进入。
可能有人会问,为什么需要令牌这么麻烦,直接控制每个小时最多只能进入100人不就行了。但是可能会有一种情况,就是早晚集市的时候,一个小时可能不止100人要入城。管理人员既想控制人口流速,又不想一刀切地限制为一个固定的流速。举个例子,在8点的时候,桶里存有100个令牌(之前积累的),8点-9点之间有超过100个人想通过城门。因为8点-9点之间本来就会增加100个令牌,加上桶里留存的100个,这一个小时内最多可以允许200人通过城门,可以满足早集的需求。
类比到接口的限流,也是一样的道理。我们希望控制的是一个平均的流量,同时又想较好地处理突增的高峰流量。从上面的例子中能看出,通过令牌桶算法,在统计意义上,我们做到了限制流量在一个阈值以下。同时,基于令牌桶中“预留”的令牌,又能比较平稳地处理突发的高流量(最多能允许两倍的流量通过)。
令牌桶算法的原理很容易理解,但是真正实现起来就比较有讲究了。看完上面的原理,可能大家的第一感觉就是,用阻塞队列模拟令牌桶,开一个定时器,定时队列里放令牌,使用生产者-消费者模式实现即可。这个方式看起来好像没什么问题,但开启定时器需要新开线程,限流本就是在高并发场景下使用,额外开启线程会给系统带来更多开销。另外,假设我们是针对热点商户进行限流,如果有1万个热点商户,我们就要开启1万个定时器,这个开销是很大的。RateLimiter使用一种巧妙的方式,基于请求的间隔时间,来“模拟”出定时的效果,下面具体来分析一下。
看看效果:
RateLimiter是google开发的guava项目中包含的一个限流类,是基于令牌桶算法实现的。我们先试着使用一下。如下图代码所示,我们先用create方法创建一个RateLimiter限流器,规定每秒往桶里放2个令牌。然后用acquire模拟短时间内的3次请求,分别要取4、4、2个令牌,然后将时间消耗的时间打印出来。
从运行结果,我们可以先直观地理解一下。第一次请求没有任何等待就获取到了4个令牌。此时,因为第一次请求了4个令牌,需要4/2=2s才能“恢复”,所以第二次请求等待了2s。同理,第三次请求也等待了2s才获取到令牌。
看到这里大家可能会疑惑,为什么第一次获取4个令牌不需要任何等待呢?这里涉及到RateLimiter实现令牌桶的一个重要概念,就是“预支”。这里可以简单理解为,第一次请求,“预支”了后面的令牌,而预支的时间,由下一次请求来“承受”。换句话说,这一次你取的多了,下一次就需要等待更久的时间。比如突发流量,我先让它过去,但是后面再来的流量就得等着。这里先这么直观理解,后面通过源码分析会更清晰一些。
现在我们来看下RateLimiter的源码实现。RateLimiter有两种实现,一个是SmoothBursty,一个是SmoothWarmingUp。
我们先看SmoothBursty实现。SmoothBursty是为了应对突发的高流量。
从刚刚的使用可以看出,外部调用只感知create和acquire两个方法。先看create方法。
public static RateLimiter create(double permitsPerSecond) {
return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}
@VisibleForTesting
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
//permitsPerSecond是我们设定的限流值
//这里maxBurstSeconds固定是1.0,表示桶里最多预留1倍的permitsPerSecond
//比如限流值是10,桶里最多就只能放10个令牌。
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
看到这里,核心是doSetRate这个方法,这是个抽象方法,在SmoothBursty和SmoothWarmingUp分别实现。
//这是SmoothBursty的实现
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
//有oldMaxPermits和maxPermits是因为,可以动态地修改permitsPerSecond的值
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
storedPermits = maxPermits;
} else {
//动态修改限流值时,桶里的剩余令牌数也要按照比例来缩放
storedPermits = (oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
create方法到这里就结束了。还是比较简单的,可以理解为设置了限流值。
重点在下面的acquire方法。我们一层层进去看。
public double acquire(int permits) {
//1. 计算这次请求需要等到多少时间
long microsToWait = reserve(permits);
//2. 阻塞这次请求上面计算出的时间
stopwatch.sleepMicrosUninterruptibly(microsToWait);
//3. 返回等待的时间(转化为微秒)
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
//检查参数,不多说
checkPermits(permits);
synchronized (mutex()) {
//获取等待的时间
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
//又包了一层方法,获取等待时间
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
//等待时间一定是>0的
return max(momentAvailable - nowMicros, 0);
}
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
//将时间重新“同步”到当前时间,直观理解成给桶里补充令牌
resync(nowMicros);
//将下次获取令牌的时间返回。
long returnValue = nextFreeTicketMicros;
//判断存储的令牌数够不够这次请求用的。如果够,那么下面的freshPermits就是0,就不用往后推迟时间
//如果存储的不够,就需要把“下次获取令牌时间”往后推迟
//下面这段代码将的就是计算推迟时间
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
try {
this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);
} catch (ArithmeticException e) {
this.nextFreeTicketMicros = Long.MAX_VALUE;
}
//桶里存储的令牌要扣除掉本次消耗的
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
stableIntervalMicros是产生一个令牌需要的时间,比如限流值是2个/s,那么stableIntervalMicros就是500ms也就是500000微秒。
这里关注一下。nextFreeTicketMicros是下次可以获取令牌的时间。从代码中可以看出,我们会根据此次请求需要的令牌数,往后推迟这个时间。这也是为什么,在使用时,前一次请求获取了更多的令牌,要让后面的请求去等待。这里大家可能有疑问,为什么返回的是更新前的nextFreeTicketMicros,而不是更新后的呢?可以这样理解,我们在最开始补充了令牌,并把时间同步到了此刻(看下面一段),那我就告诉调用方,此时就可以获取令牌了,这一次请求不用等待了。等待的时间让下一次请求承受吧。这也就是所谓的“预支”令牌。
看到这里大家可能还会有一个疑问,不是说最多只能通过2倍的限流量吗?按照这种说法,是不是可以预支n多倍的令牌数,然后让后面的请求等待更多的时间呢?比如我限流10,但是我一次请求100个令牌,让后面的请求等着就行了,那我这一秒的流量不就到100了吗?这里我个人是这样理解的。一般情况下,一次业务请求只需要获取一个令牌。比如我们限流了10,短时间内虽然可能来了100个请求,但是每次请求都是只要获取1个令牌。假设桶里有10个令牌,当第11次请求来的时候,“下次获取令牌的时间”就开始往后推迟了0.1s了,第20个请求来的时候,就推迟了1s了,那么从第0s到第1s这一秒钟的时间内,就通过了20个请求,正好是2倍的限流量了。
刚刚看到reserveEarliestAvailable这个方法最开始有一个resync方法,这个我们再看一下源码:
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
//补充令牌,注意不能超了
storedPermits = min(maxPermits,
storedPermits + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());
//下次可以获取令牌的时间变成现在
nextFreeTicketMicros = nowMicros;
}
}
这个怎么理解呢。其实从之前的reserveEarliestAvailable方法我们可以大概看到,我们只是在请求过来的时候,把桶里存储的令牌减去了,但是没有做补充令牌的操作。根据令牌桶原理,我们需要定时添加令牌,那怎么办呢。我们就在每次请求过来的最开始,计算出差了多少时间,把中间这段时间应该放的令牌补上就可以了。
那为什么会有一个时间判断条件if (nowMicros > nextFreeTicketMicros)呢?注意,如果两次请求间隔很短,那么在这两次请求之间本来就不应该补。那我们怎么知道要不要补充呢?从之前的方法我们看出,我们维护了一个时间,是“下次可以获取令牌的时间”,也就是nextFreeTicketMicros。当一次请求过来,如果这个nextFreeTicketMicros还没到,那么我们就不需要做“补充”操作。如果这个nextFreeTicketMicros已经过去了,那我们就需要把这个时间段内的令牌给补充上。这就是所谓的把时间“同步到此刻”。
举个例子:
到这里SmoothBursty的源码就分析完了。分析到这里有一种感觉,就是看代码逻辑是这么个道理,但是总感觉很抽象,不知道这个代码为什么这么写的。我们再详细举个例子,能更有体感一些。
就假设我们设定了限流值是10,假设在第0秒的时候一下子来了20个请求。
一种情况,在第0秒的时候,桶里有10个令牌,那么第一个请求过来的时候,经过resync,nextFreeTicketMicros是此刻(0s)。因为桶里的令牌数足够,所以此时不需要推迟nextFreeTicketMicros。这次请求完,桶里还剩9个令牌。
以此类推,前10个请求,都不需要推迟nextFreeTicketMicros。完成后,桶里没有令牌了。
第11个请求来了,此时桶里没有令牌了,需要推迟nextFreeTicketMicros,推迟0.1s,但是此时返回的因为是更新前的nextFreeTicketMicros,所以第11个请求还是立刻就通过了。
第12个请求来的时候,此时还是第0s(因为我们假设一瞬间来了20个请求),nextFreeTicketMicros是第0.1s了,此时第12个请求要等0.1s了,并且把nextFreeTicketMicros更新到第0.2s处了。
以此类推,第20个请求来的时候,需要等到第1s才能获取到令牌。由此看出,这1s的时间内,通过了20个请求。
那么如果,在第0s的时候,桶里没有令牌呢,那么从第1个请求开始,nextFreeTicketMicros就会每次都推迟0.1s,那么1s内就只能通过10个请求了。
分析完了SmoothBursty,我们再分析一下SmoothWarmingUp这种实现。SmoothWarmingUp预热这种实现是有点抽象的,但是我们数形结合着来看,可以帮助理解其核心思想。
首先我们理解一下为什么需要预热,如上文所说,如果长时间没有请求过来,一下次来了一个突增的请求,会导致系统的压力比较大。这里就有两个问题,1怎么预热?2怎么衡量系统的冷热程度?
怎么预热呢?我们的目的是让请求通过的不那么快,那么我们让令牌产生的速度变慢,不就可以了吗?换言之,我们让“下次获取令牌的时间”推迟得更多一点不就可以了吗?
怎么衡量系统的冷热程度呢?我们换个角度想,为什么系统会冷,是因为请求来得慢,请求来得慢会有什么表现呢?那就是桶里的令牌多。由此,我们可以用令牌数来表征系统冷热程度,令牌越多,系统越冷,需要让“下次获取令牌的时间”推迟得更多。
我们先来理解一下这张图:
这张图的x轴是桶内的令牌数,thresholdPermits指的是预热阈值,也就是说,当桶内的令牌数超过这个值时,生成每个令牌所需的时间就会成比例增加。stableIntervalMicros表示稳定运行时产生每个令牌需要的时间,coldIntervalMicros表示最冷的时候产生每个令牌需要的时间。
这张图我们一定要理解的点是,最开始系统最冷的时候,桶内令牌数是maxPermits,预热阶段是从最右边往左走的过程。第二个点是,这个坐标轴围成的面积是什么意思,是需要往后推迟的生产令牌的时间!这个点不是很好理解,可以类比稳定阶段来理解,举例来说,比如在稳定阶段,stableIntervalMicros为100ms,某一次请求需要4个令牌,那么下次获取令牌的时间就要往后推迟400ms。而在预热阶段,每消耗一个令牌需要推迟的时间是和桶内令牌数成正比的,因此上图中,围成的梯形面积就是在预热阶段,需要推迟的获取令牌的时间。
ok,有了上面的基本思想,我们再来看源码。源码核心的就是在计算上面提到的推迟时间。
上面提到,在create阶段,create方法中带了参数几个参数
warmupPeriod是预热时长,可以自己指定。3.0是硬编码的coldFactor,即所谓的预热因子,这个coldFactor表示,在系统最冷的时候,产生一个令牌所需的时间是系统稳定运行时产生一个令牌的时间的3倍。
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit,
3.0);
}
通过之前的分析我们知道,create的核心方法就是doSetRate,SmoothWarmingUp的实现是下面这样的。
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
//coldFactor固定是3
double coldIntervalMicros = stableIntervalMicros * coldFactor;
//0.5的系数也是固定的,可以认为令牌的阈值是一个折中值
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
//见下方解释1
maxPermits = thresholdPermits
+ 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
//见下方解释2
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits = (oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}
上面代码有两个地方解释下。
变换一下即可得到maxPermits。
最后看一下SmoothWarmingUp的acquire方法。其中获取下一次令牌时间的计算方法如下,大致流程和smoothbursty相似,重点要看计算推迟时间的方法storedPermitsToWaitTime。
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
//计算“下次可获取令牌”需要推迟的时间,见下面的代码分析
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
try {
this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);
} catch (ArithmeticException e) {
this.nextFreeTicketMicros = Long.MAX_VALUE;
}
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
//判断是否会经过预热阶段
if (availablePermitsAboveThreshold > 0.0) {
//注意这一行,将两种情况合并为一个公式计算,具体理解可以看下面的图示
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
//这里计算,在预热阶段的令牌会多出多少时间
//这个就是一个计算梯形面积的公式,(上底+下底)*高/2
micros = (long) (permitsAboveThresholdToTake
* (permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)) / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// 这个是假设都在稳定阶段,所需要的时间
micros += (stableIntervalMicros * permitsToTake);
return micros;
}
//这个方法是计算在当前令牌数下,生产每个令牌需要的时间
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
看完上面的计算,大家可能还是比较懵,下面再举个例子来对着代码分析一下,就一目了然了。
情况1:全部都在预热阶段,如下图所示。此时对应的需要计算的时间是梯形ABCD的面积。先分别用permitsToTime方法计算出DE和CF的长度,然后算出梯形CDEF的面积,最后加上矩形ABEF的面积,即可。
情况二:一部分在预热阶段,一部分在稳定阶段,如下图所示。此时需要计算的时间为多边形ABCED的面积。注意此时代码里的permitsAboveThresholdToTake和permitsAboveThresholdToTake是相等的,所以代码中计算的梯形面积退化成了三角形CEF的面积(上底退化为0)。最后计算出的面积为CEF+ABFD。
从上面两种情况可以看出,代码中利用这一行permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);将两种情况统一为一个“梯形”面积计算公式,还是很巧妙的。
由此,我们计算得到了消耗permitsToTake个令牌需要推迟的时间,就可以得到下次获取令牌的时间。到这里,SmoothWarmingUp实现也就分析完了。其实说白了,SmoothWarmingUp就是为了预热而推迟了更多的时间,上面那一大段的计算,只是为了计算要多推迟多久而已。
令牌桶算法的原理和RateLimiter的实现就分析到这里了。写完这篇文章也有一些感慨,最开始去看令牌桶算法的时候,几句话就看明白了基本思路,感觉是一个很简单的算法。但是真正去看了RateLimiter的实现源码,才发现实现起来没那么简单,要考虑的因素很多,代码实现中的一些思想也是值得仔细思考的,比如不用定时器怎么模拟生产令牌,怎么应对突发流量,怎么预热等。另外一点感受,一个问题,自己理解了和能让别人理解是有很大差别的,可能按照我自己理解写的这篇文章,也没有完全表述清楚我的想法,这就需要更多的交流和碰撞,在反复提问和反思中才会加深理解。