
一、为什么需要限流?
二、基于redis实现限流思考
三、实现方案
四、使用方式
五、延伸思考
1.防刷
2.防黑灰产
3.保护服务以及背后资源
至于市面上流行的限流算法和实现方式,此处不再展开介绍,我们主要针对业务接口维度的限流做分析。
谈到限流,分为单机和分布式限流,因为现在的服务基本都是集群部署,所以主要聊聊分布式限流,既然谈到分布式流控,那么一定少不了中心化节点来做全局管控,而redis又是应用必不可少的基础构件,那么我们就聊一下基于redis实现分布式流控。

请求到服务层时,去redis申请访问资源,如果拿到就继续调用业务逻辑,如果拿不到返回访问受限。
需要考虑以下几点:
redis单实例的情况下不需要考虑,如果redis是集群模式,那么需要考虑主节点漂移问题,可参考redisson红锁实现,节点过半机制。
实现限流,无非是redis放一个key,然后设置过期时间,而两个操作单独调用无法实现原子性,刚好redis2.6版本以后,只吃了lua脚本,只需要将设置key和过期命令放到lua脚本里边,即可实现原子性。
为了保证限流机制不过度侵入业务逻辑,需要把流控实现抽象出来,然后api通过注解的方式依赖和实现。
支持每个业务接口流控可自定义,并且可以通过简单的参数配置即可实现流控。

抽象出limiter-starter组件,编写流控逻辑,然后应用服务需要依赖。
通过lua脚本操作redis,来实现原子性。
local c
c = redis.call('get',KEYS[1])
if c and tonumber(c) > tonumber(ARGV[1]) then
return c;
end
c = redis.call('incr',KEYS[1])
if tonumber(c) == 1 then
redis.call('expire',KEYS[1],ARGV[2])
end
return c;@Slf4j
public class RateLimiter {
protected RedisSerializer keySerializer = new StringRedisSerializer();
protected RedisSerializer valueSerializer = new Jackson2JsonRedisSerializer(Object.class);
@Autowired
protected StringRedisTemplate stringRedisTemplate;
public boolean tryAccess(String key,long limitCount,int seconds) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
List<String> keys = new ArrayList<>(2);
keys.add(key);
Long count = stringRedisTemplate.execute(redisScript
, this.valueSerializer
, this.keySerializer
, keys
, limitCount
,seconds);
return count <= limitCount;
}
}限流注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
String key() default "";
@AliasFor("key")
String value() default "";
int limitCount();
int seconds();
}拦截器:
@Slf4j
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE + 1)
public class RateLimiterInterceptor {
@Autowired
private RateLimiter rateLimiter;
private ExpressionParser parser = new SpelExpressionParser();
private LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();
@Before("@annotation(com.limiter.starter.annotation.RateLimit)")
public Object before(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature joinPointObject = (MethodSignature) pjp.getSignature();
Method method = joinPointObject.getMethod();
RateLimit limit = method.getAnnotation(RateLimit.class);
if(null == limit) {
return pjp.proceed();
}
Object[] args = pjp.getArgs();
String[] params = discoverer.getParameterNames(method);
EvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < params.length; i++) {
context.setVariable(params[i],args[i]);
}
String keySpel = limit.key();
Expression keyExpression = parser.parseExpression(keySpel);
String key = keyExpression.getValue(context,String.class);
key = method.getName() + "#" + key;
boolean accessable = true;
try {
accessable = this.rateLimiter.tryAccess(key, limit.limitCount(), limit.seconds());
} catch (Exception e) {
log.error("RateLimiterInterceptor.before check limit occur error;key={}",key,e);
accessable = true;
}
if(!accessable) {
return CommonResult.getFailureResult(EntityError.IP_LIMIT);
}
return pjp.proceed();
}
}支持Spring spel风格注解使用与解析。
主配置类:
@Configuration
@ConditionalOnBean(RedisConnectionFactory.class)
@Slf4j
public class RateLimitConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean(RateLimiter.class)
public RateLimiter rateLimiter() {
return new RateLimiter();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@ConditionalOnMissingBean(RateLimiter.class)
public RateLimiterInterceptor rateLimiterInterceptor() {
return new RateLimiterInterceptor();
}
}spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.limiter.starter.config.RateLimitConfiguration<dependency>
<groupId>xxx</groupId>
<artifactId>limiter-starter</artifactId>
</dependency>限流starter依赖redis,如果配置了redisTemplate则可以忽略,没有的话需要补充。
@RateLimit(key="xxx + #req.userId",limitCount=5,seconds=2)
public void methodA(HttpServletRequest request, @RequestBody XxxReq req) {
//todo
}这样就实现了接口粒度的限流操作。

比如某个接口限流100/1min,那么如果达到限流上限后,redis重新选主,新的master不持有限流key,那么新的请求还能进来,导致限流短时间失控。
如果接口注解层面没有指定key,那么要根据调用方法和ip生成默认流控策略,比如单ip某个方法调用单位时间内限制多少次访问。
如果粒度比较大的情况限流策略,100/min,那么在达到流控上限后,新的请求过来再去调用redis做检查,其实没有太大意义,并且增加了网络损耗,这种情况下可以考虑做本地化,到达流控上限后,本地化持有一份具有过期时间的策略,新的流量进来之后,可以直接从本地检查返回。当然这种缓存一致性也是个需要酌量的问题。
本文分享自 PersistentCoder 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!