并发编程-25 高并发处理手段之消息队列思路 + 应用拆分思路 + 应用限流思路
限流作为一种流量控制策略 (通常会和熔断、降级搭配在一起使用,来避免瞬时的大量请求对系统造成负荷,来达到保护服务平稳运行的目的)旨在维护系统的稳定性。然而,限流也带来了平衡用户满意度与系统稳定性之间的挑战。
限流策略的核心意义: 限流是一种管理流量的方法,通过设置最大请求率或并发连接数,以防止系统被过多请求压垮。随着数字服务的普及,限流变得尤为重要,因为高流量可能导致系统崩溃,影响整体可用性。 限流与用户体验的平衡: 限流策略的挑战之一是如何在保持系统稳定的同时提供令人满意的用户体验。过于严格的限流可能导致用户等待时间过长,降低用户满意度。但没有限流,系统可能会被过度请求而崩溃,影响用户所有人。
我们这里将深入探讨限流的集中实现方式
固定窗口算法通过在单位时间内维护一个计数器,能够限制在每个固定的时间段内请求通过的次数,以达到限流的效果。
@Slf4j
public class FixedWindowRateLimiter {
// 时间窗口大小,单位毫秒
private long windowSize;
// 允许通过请求数
private int maxRequestCount;
// 当前窗口通过的请求计数
private AtomicInteger count = new AtomicInteger(0);
// 窗口右边界
private long windowBorder;
public FixedWindowRateLimiter(long windowSize, int maxRequestCount) {
this.windowSize = windowSize;
this.maxRequestCount = maxRequestCount;
windowBorder = System.currentTimeMillis() + windowSize;
}
/**
* 尝试获取许可
*
* @return 若获取成功则返回 true,否则返回 false
*/
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
// 如果窗口边界小于当前时间,则表示窗口重置
if (windowBorder < currentTime) {
log.info("窗口重置");
// 重新计算新的窗口边界,确保窗口大小不变
do {
windowBorder += windowSize;
} while (windowBorder < currentTime);
// 重置计数器
count = new AtomicInteger(0);
}
// 如果请求计数小于允许的最大请求数
if (count.intValue() < maxRequestCount) {
log.info("获取许可成功");
// 计数增加并返回成功
count.incrementAndGet();
return true;
} else {
log.info("获取许可失败");
return false;
}
}
}
通过构造方法中的参数指定时间窗口大小以及允许通过的请求数量,当请求进入时先比较当前时间是否超过窗口上边界,未越界且未超过计数器上限则可以放行请求。
测试
public static void main(String[] args) throws InterruptedException {
// 在1000毫秒内通过5个请求
FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(1000, 5);
for (int i = 0; i < 10; i++) {
if (fixedWindowRateLimiter.tryAcquire()) {
System.out.println("执行任务");
}else{
System.out.println("被限流");
TimeUnit.MILLISECONDS.sleep(300);
}
}
}
缺点 :临界问题
假设有一个恶意用户,在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在1秒里面,瞬间发送了200个请求。我们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求,可以瞬间超过我们的速率限制。用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。
刚才的问题其实是因为我们统计的精度太低。那么如何很好地处理这个问题呢?或者说,如何将临界问题的影响降低呢?我们可以看下面的滑动窗口算法
滑动窗口算法在固定窗口的基础上,进行了一定的升级改造。它的算法的核心在于将时间窗口进行了更精细的分片,将固定窗口分为多个小块,每次仅滑动一小块的时间。
并且在每个时间段内都维护了单独的计数器,每次滑动时,都减去前一个时间块内的请求数量,并再添加一个新的时间块到末尾,当时间窗口内所有小时间块的计数器之和超过了请求阈值时,就会触发限流操作。
该算法的实现,核心就是通过一个int类型的数组循环使用来维护每个时间片内独立的计数器:
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SlidingWindowRateLimiter {
// 时间窗口大小,单位毫秒
private long windowSize;
// 分片窗口数
private int shardNum;
// 允许通过请求数
private int maxRequestCount;
// 各个窗口内请求计数
private int[] shardRequestCount;
// 请求总数
private int totalCount;
// 当前窗口下标
private int shardId;
// 每个小窗口大小,毫秒
private long tinyWindowSize;
// 窗口右边界
private long windowBorder;
public SlidingWindowRateLimiter(long windowSize, int shardNum, int maxRequestCount) {
this.windowSize = windowSize;
this.shardNum = shardNum;
this.maxRequestCount = maxRequestCount;
shardRequestCount = new int[shardNum];
tinyWindowSize = windowSize / shardNum;
windowBorder = System.currentTimeMillis();
}
/**
* 尝试获取许可
*
* @return 若获取成功则返回 true,否则返回 false
*/
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
// 如果当前时间超过窗口右边界,进行窗口滑动操作
if (currentTime > windowBorder) {
do {
shardId = (++shardId) % shardNum;
totalCount -= shardRequestCount[shardId];
shardRequestCount[shardId] = 0;
windowBorder += tinyWindowSize;
} while (windowBorder < currentTime);
}
// 如果请求总数小于允许的最大请求数
if (totalCount < maxRequestCount) {
log.info("获取许可成功。窗口ID:{}", shardId);
shardRequestCount[shardId]++;
totalCount++;
return true;
} else {
log.info("获取许可失败。窗口ID:{}", shardId);
return false;
}
}
}
测试
// 对第一个例子中的规则进行修改,每1秒允许100个请求通过不变,在此基础上再把每1秒等分为10个0.1秒的窗口。
SlidingWindowRateLimiter slidingWindowRateLimiter
= new SlidingWindowRateLimiter(1000, 10, 10);
TimeUnit.MILLISECONDS.sleep(800);
for (int i = 0; i < 15; i++) {
boolean acquire = slidingWindowRateLimiter.tryAcquire();
if (acquire){
System.out.println("执行任务");
}else{
System.out.println("被限流");
}
TimeUnit.MILLISECONDS.sleep(10);
}
程序启动后,在先休眠了一段时间后再发起请求,可以看到在0.9秒到1秒的时间窗口内放行了6个请求,在1秒到1.1秒内放行了4个请求,随后就进行了限流,解决了在固定窗口算法中相邻时间窗口内允许通过大量请求的问题。
滑动窗口算法通过将时间片进行分片,对流量的控制更加精细化,但是相应的也会浪费一些存储空间,用来维护每一块时间内的单独计数,并且还没有解决固定窗口中可能出现的流量激增问题。
为了应对流量激增的问题,后续又衍生出了漏桶算法,用专业一点的词来说,漏桶算法能够进行流量整形和流量控制。
漏桶是一个很形象的比喻,外部请求就像是水一样不断注入水桶中,而水桶已经设置好了最大出水速率,漏桶会以这个速率匀速放行请求,而当水超过桶的最大容量后则被丢弃。
@Slf4j
public class LeakyBucketRateLimiter {
// 桶的容量
private int capacity;
// 桶中现存水量
private AtomicInteger water = new AtomicInteger(0);
// 开始漏水时间
private long leakTimeStamp;
// 水流出的速率,即每秒允许通过的请求数
private int leakRate;
public LeakyBucketRateLimiter(int capacity, int leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
}
/**
* 尝试获取许可
*
* @return 若获取成功则返回 true,否则返回 false
*/
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
// 如果桶中没有水,重新开始计算
if (water.get() == 0) {
log.info("开始漏水");
leakTimeStamp = currentTime;
water.incrementAndGet();
return water.get() < capacity;
}
// 先漏水,计算已漏水量
int leakedWater = (int) ((currentTime - leakTimeStamp) / 1000 * leakRate);
log.info("上次漏水时间:{}, 当前时间:{}. 已漏水量:{}", leakTimeStamp, currentTime, leakedWater);
// 如果已漏水量不为0,更新桶中水量
if (leakedWater != 0) {
int leftWater = water.get() - leakedWater;
// 可能水已漏光,设为0
water.set(Math.max(0, leftWater));
leakTimeStamp = currentTime;
}
log.info("剩余容量:{}", capacity - water.get());
// 如果桶还有容量,获取许可
if (water.get() < capacity) {
log.info("获取许可成功");
water.incrementAndGet();
return true;
} else {
log.info("获取许可失败");
return false;
}
}
}
测试
LeakyBucketRateLimiter leakyBucketRateLimiter
= new LeakyBucketRateLimiter(3, 1);
for (int i = 0; i < 15; i++) {
if (leakyBucketRateLimiter.tryAcquire()) {
System.out.println("执行任务");
} else {
System.out.println("被限流");
}
TimeUnit.MILLISECONDS.sleep(500);
}
先初始化一个漏桶,设置桶的容量为3,每秒放行1个请求,在代码中每500毫秒尝试请求1次
09:01:58.404 [main] INFO com.artisan.LeakyBucketRateLimiter - start leaking
执行任务
09:01:58.911 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530118410, currentTime:1693530118911. LeakedWater:0
09:01:58.914 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:2
09:01:58.915 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire success
执行任务
09:01:59.422 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530118410, currentTime:1693530119422. LeakedWater:1
09:01:59.422 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:2
09:01:59.422 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire success
执行任务
09:01:59.934 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530119422, currentTime:1693530119934. LeakedWater:0
09:01:59.934 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:1
09:01:59.934 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire success
执行任务
09:02:00.444 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530119422, currentTime:1693530120444. LeakedWater:1
09:02:00.444 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:1
09:02:00.444 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire success
执行任务
09:02:00.976 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530120444, currentTime:1693530120975. LeakedWater:0
09:02:00.976 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:0
09:02:00.976 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire fail
被限流
09:02:01.489 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530120444, currentTime:1693530121489. LeakedWater:1
09:02:01.490 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:1
09:02:01.490 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire success
执行任务
09:02:02.002 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530121490, currentTime:1693530122002. LeakedWater:0
09:02:02.002 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:0
09:02:02.002 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire fail
被限流
09:02:02.515 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530121490, currentTime:1693530122515. LeakedWater:1
09:02:02.515 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:1
09:02:02.515 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire success
执行任务
09:02:03.016 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530122515, currentTime:1693530123016. LeakedWater:0
09:02:03.016 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:0
09:02:03.016 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire fail
被限流
09:02:03.531 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530122515, currentTime:1693530123531. LeakedWater:1
09:02:03.531 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:1
09:02:03.531 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire success
执行任务
09:02:04.042 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530123531, currentTime:1693530124042. LeakedWater:0
09:02:04.042 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:0
09:02:04.042 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire fail
被限流
09:02:04.553 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530123531, currentTime:1693530124553. LeakedWater:1
09:02:04.553 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:1
09:02:04.553 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire success
执行任务
09:02:05.066 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530124553, currentTime:1693530125066. LeakedWater:0
09:02:05.066 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:0
09:02:05.066 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire fail
被限流
09:02:05.580 [main] INFO com.artisan.LeakyBucketRateLimiter - lastTime:1693530124553, currentTime:1693530125580. LeakedWater:1
09:02:05.580 [main] INFO com.artisan.LeakyBucketRateLimiter - 剩余容量:1
09:02:05.580 [main] INFO com.artisan.LeakyBucketRateLimiter - tryAcquire success
执行任务
Process finished with exit code 0
漏桶算法的缺点,不管当前系统的负载压力如何,所有请求都得进行排队,即使此时服务器的负载处于相对空闲的状态,这样会造成系统资源的浪费。由于漏桶的缺陷比较明显,所以在实际业务场景中,使用的比较少。
令牌桶算法
令牌桶算法是基于漏桶算法的一种改进,主要在于令牌桶算法能够在限制服务调用的平均速率的同时,还能够允许一定程度内的突发调用。
它的主要思想是系统以恒定的速度生成令牌,并将令牌放入令牌桶中,当令牌桶中满了的时候,再向其中放入的令牌就会被丢弃。而每次请求进入时,必须从令牌桶中获取一个令牌,如果没有获取到令牌则被限流拒绝。
假设令牌的生成速度是每秒100个,并且第一秒内只使用了70个令牌,那么在第二秒可用的令牌数量就变成了130,在允许的请求范围上限内,扩大了请求的速率。当然,这里要设置桶容量的上限,避免超出系统能够承载的最大请求数量。
Guava中的RateLimiter就是基于令牌桶实现的
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
void acquireTest(){
RateLimiter rateLimiter=RateLimiter.create(5);
for (int i = 0; i < 10; i++) {
double time = rateLimiter.acquire();
log.info("等待时间:{}s",time);
}
}
可以看到 每200ms左右产生一个令牌并放行请求,也就是1秒放行5个请求,使用RateLimiter能够很好的实现单机的限流。
前面提到的突发流量情况,令牌桶是怎么解决的呢?RateLimiter中引入了一个预消费 的概念。
翻译一下:
申请令牌的数量 不同不会影响这个申请令牌这个动作本身的响应时间,acquire(1)和acquire(1000)这两个请求会消耗同样的时间返回结果,但是会影响下一个请求的响应时间。
如果一个消耗大量令牌的任务到达空闲 的RateLimiter
,会被立即批准执行,但是当下一个请求进来时,将会额外等待一段时间,用来支付前一个请求的时间成本。
举个例子: 当一个系统处于空闲状态时,突然来了1个需要消耗100个令牌的任务,那么白白等待100秒是毫无意义的浪费资源行为,那么可以先允许它执行,并对后续请求进行限流时间上的延长,以此来达到一个应对突发流量的效果。
void acquireMultiTest(){
RateLimiter rateLimiter=RateLimiter.create(1);
for (int i = 0; i <3; i++) {
int num = 2 * i + 1;
log.info("获取{}个令牌", num);
double cost = rateLimiter.acquire(num);
log.info("获取{}个令牌结束,耗时{}s",num,cost);
}
}
可以看到,在第二次请求时需要3个令牌,但是并没有等3秒后才获取成功,而是在等第一次的1个令牌所需要的1秒偿还后,立即获得了3个令牌得到了放行。
同样,第三次获取5个令牌时等待的3秒是偿还的第二次获取令牌的时间,偿还完成后立即获取5个新令牌,而并没有等待全部重新生成完成。
RateLimiter还具有平滑预热功能,下面的代码就实现了在启动3秒内,平滑提高令牌发放速率到每秒5个的功能
void acquireSmoothly(){
RateLimiter rateLimiter =RateLimiter.create(5,3, TimeUnit.SECONDS);
long startTimeStamp = System.currentTimeMillis();
for (int i = 0; i < 15; i++) {
double time = rateLimiter.acquire();
log.info("等待时间:{}s, 总时间:{}ms" ,time,System.currentTimeMillis()-startTimeStamp);
}
}
可以看到,令牌发放时间从最开始的500ms多逐渐缩短,在3秒后达到了200ms左右的匀速发放。
https://docs.spring.io/spring-cloud-gateway/docs/3.1.8/reference/html/#the-redis-ratelimiter