在实时数据处理系统中,我们经常需要统计某个事件在特定时间窗口内的发生次数,例如:
这类需求通常面临两个核心挑战:
本文将详细介绍如何基于 Redis 实现高性能、高可用的计数方案,并提供完整的Java代码实现。
方案 | QPS | 数据一致性 | 实现复杂度 |
|---|---|---|---|
数据库+事务 | ~1K | 强一致 | 高 |
本地缓存 | ~100K | 最终一致 | 中 |
Redis原子操作 | 50K+ | 强一致 | 低 |
Redis的单线程模型天然适合计数场景,提供INCR/INCRBY等原子命令。
// 格式:业务前缀:appId:deviceId:ip:时间窗口
String key = "flow:count:app123:device456:127.0.0.1:2023080117";public void incrementCount(String key, int delta) {
redisTemplate.opsForValue().increment(key, delta);
}问题:没有过期时间,会导致数据无限堆积
public void incrementWithExpire(String key, int delta, long ttlSeconds) {
redisTemplate.opsForValue().increment(key, delta);
redisTemplate.expire(key, ttlSeconds, TimeUnit.SECONDS);
}新问题:每次操作都设置TTL,造成冗余Redis调用
我们需要确保TTL只在Key创建时设置一次,两种实现方式:
private static final String LUA_SCRIPT =
"local current = redis.call('INCRBY', KEYS[1], ARGV[1])\n" +
"if current == tonumber(ARGV[1]) then\n" +
" redis.call('EXPIRE', KEYS[1], ARGV[2])\n" +
"end\n" +
"return current";
public Long incrementAtomically(String key, int delta, long ttl) {
return redisTemplate.execute(
new DefaultRedisScript<>(LUA_SCRIPT, Long.class),
Collections.singletonList(key),
String.valueOf(delta), String.valueOf(ttl)
);
}优势:
public void incrementWithNX(String key, int delta, long ttl) {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
StringRedisConnection conn = (StringRedisConnection) connection;
conn.setNX(key, "0"); // 尝试初始化
conn.incrBy(key, delta);
if (conn.setNX(key + ":lock", "1")) { // 简易锁判断首次
conn.expire(key, ttl);
conn.expire(key + ":lock", 10);
}
return null;
});
}适用场景:Redis版本<2.6(不支持Lua)
public long calculateTtlToNextHour() {
LocalDateTime now = LocalDateTime.now();
LocalDateTime nextHour = now.plusHours(1).truncatedTo(ChronoUnit.HOURS);
return ChronoUnit.SECONDS.between(now, nextHour);
}@Component
@RequiredArgsConstructor
public class FlowCounter {
private final RedisTemplate<String, String> redisTemplate;
private static final String KEY_PREFIX = "flow:count:";
@KafkaListener(topics = "${kafka.topic}")
public void handleMessages(List<Message> messages) {
Map<String, Integer> countMap = messages.stream()
.collect(Collectors.toMap(
this::buildKey,
msg -> 1,
Integer::sum
));
countMap.forEach((k, v) ->
incrementAtomically(k, v, calculateTtlToNextHour())
);
}
private String buildKey(Message msg) {
return String.format("%s%s:%s:%s:%s",
KEY_PREFIX,
msg.getAppId(),
msg.getDeviceId(),
msg.getIp(),
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"))
);
}
}public long getCurrentCount(String appId, String deviceId, String ip) {
String key = buildKey(appId, deviceId, ip);
String val = redisTemplate.opsForValue().get(key);
return val != null ? Long.parseLong(val) : 0L;
}redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
StringRedisConnection conn = (StringRedisConnection) connection;
countMap.forEach((k, v) -> {
conn.incrBy(k, v);
// 可结合Lua脚本进一步优化
});
return null;
});// 在内存中先合并相同Key的计数
Map<String, Integer> localCount = messages.stream()
.collect(Collectors.toMap(
this::buildKey,
m -> 1,
Integer::sum
));{}强制哈希标签,保证相同Key路由到同一节点
"{flow}:count:app123:..."@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 100))
public void safeIncrement(String key, int delta) {
// 业务逻辑
}# TYPE redis_operations_total counter
redis_operations_total{operation="incr"} 12345
redis_operations_total{operation="expire"} 678@Scheduled(fixedRate = 3600000)
public void checkDataConsistency() {
// 对比DB与Redis计数差异
}方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
Lua脚本 | 原子性强,性能最佳 | 需要Redis 2.6+ | 新项目首选 |
SETNX+INCR | 兼容旧版 | 有竞态风险 | 遗留系统 |
纯INCR+TTL | 实现简单 | TTL冗余 | 不推荐生产 |
通过本文的方案,我们实现了:
最佳实践建议:
完整代码示例已上传GitHub:[示例仓库链接]