首先,分析一下主从+哨兵模式带来的问题:
因此,在 Redis 3.0 之后,提供了 Cluster 的解决方案,核心原理是对数据做分片:
和 Sentinel 类似,Cluster 也存在服务监控和选举规则:
当某个节点被标记为客观下线后,会从该主节点的从节点中选举一个从节点作为新的主节点:
注意:
顺序分布:
一致性哈希:
如果采用一致性哈希算法,若某个节点挂了,受影响的数据仅仅是此节点到环空间前一个节点(沿着逆时针方向行走遇到的第一个节点)之间的数据,其它不受影响。增加一个节点也同理。
但是当删除节点时,数据再分配会把当前节点所有数据加到它的下一个节点上(缓存抖动)。这样会导致下一个节点使用率暴增,可能会导致挂掉,如果下一个节点挂掉,下下个节点将会承受更大的压力,最终导致集群雪崩。
Redis 并没有使用一致性哈希,而是采用哈希槽的方式进行分片
Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽:
理论上 CRC16 算法可以得到 2^{16} 个数值,其数值范围在 0-65535 之间,取模运算 key 的时候,应该是CRC16(key)%65535,但是却设计为CRC16(key)%16384,原因是作者在设计的时候做了空间上的权衡,觉得节点最多不可能超过1000个,同时为了保证节点之间通信效率,所以采用了 2^{14}。
具体分片方式如下:
使用哈希槽的优势:
注意:
本章节的 demo 代码示例,免搭建即开即用:learn-redis-demo
基于 Redis 实现分布式锁主要依赖于 SETNX
命令:
SETNX key value
:若不存在 key 则设置 key 值为 value,返回 1为了防止某个线程获取锁之后异常结束没有释放锁,导致其他线程调用 SETNX
命令返回 0 而进入死锁,因此加锁后需要设置超时时间
以下是一个简单的 SpringBoot demo:
@RestController
@RequestMapping("/sell")
public class AppController {
@Resource
StringRedisTemplate stringRedisTemplate;
String LOCK = "TICKETSELLER";
String KEY = "TICKET";
@GetMapping("/ticket")
public void sellTicket() {
Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(LOCK, "1");
if (Boolean.TRUE.equals(isLocked)) {
// 设置过期时间 5s
stringRedisTemplate.expire(LOCK, 5, TimeUnit.SECONDS);
try {
// 拿到 ticket 的数量
int ticketCount = Integer.parseInt((String) stringRedisTemplate.opsForValue().get(KEY));
if (ticketCount > 0) {
// 扣减库存
stringRedisTemplate.opsForValue().set(KEY, String.valueOf(ticketCount - 1));
System.out.println("I get a ticket!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
stringRedisTemplate.delete(LOCK);
}
} else {
System.out.println("Field");
}
}
}
SETNX
创建了锁,假如这个服务在创建锁之后由于事故导致直接停机,那么这个锁就是一个永不过期的锁解决方案:
@RestController
@RequestMapping("/sell")
public class AppController {
@Resource
StringRedisTemplate stringRedisTemplate;
String LOCK = "TICKETSELLER";
String KEY = "TICKET";
@GetMapping("/ticket")
public void sellTicket() {
// LUA 脚本
String LUA Script =
"if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " +
"then redis.call('expire',KEYS[1],ARGV[2]) ;" +
"return true " +
"else return false " +
"end";
// 回调函数返回加锁状态
Boolean isLocked = stringRedisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
return connection.eval(LUA Script.getBytes(),
ReturnType.BOOLEAN,
1,
LOCK.getBytes(),
"1".getBytes(),
"5".getBytes());
}
});
if (Boolean.TRUE.equals(isLocked)) {
try {
int ticketCount = Integer.parseInt((String) stringRedisTemplate.opsForValue().get(KEY));
if (ticketCount > 0) {
stringRedisTemplate.opsForValue().set(KEY, String.valueOf(ticketCount - 1));
System.out.println("I get a ticket!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
stringRedisTemplate.delete(LOCK);
}
} else {
System.out.println("Field");
}
}
}
假设现有服务 A 和服务 B,A 先拿到锁执行业务,但是由于业务过长导致 A 的锁到期后超时释放:
针对第一种问题的出现,解决方案很简单,只需要对锁的值做出限制即可:
@RestController
@RequestMapping("/sell")
public class AppController {
@Resource
StringRedisTemplate stringRedisTemplate;
String LOCK = "TICKETSELLER";
String KEY = "TICKET"; // 记得在 redis 里面设置好 TICKET 的数量
@GetMapping("/ticket")
public void sellTicket() {
String lockLuaScript =
"if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " +
"then redis.call('expire',KEYS[1],ARGV[2]) ;" +
"return true " +
"else return false " +
"end";
// 生产环境替换为 uuid + 线程 id
String VALUE = String.valueOf(Thread.currentThread().getId());
Boolean isLocked = stringRedisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
return connection.eval(lockLuaScript.getBytes(),
ReturnType.BOOLEAN,
1,
LOCK.getBytes(),
VALUE.getBytes(), // 用于判断是否为当前线程加的锁
"5".getBytes()
);
}
});
if (Boolean.TRUE.equals(isLocked)) {
try {
int ticketCount = Integer.parseInt((String) stringRedisTemplate.opsForValue().get(KEY));
if (ticketCount > 0) {
stringRedisTemplate.opsForValue().set(KEY, String.valueOf(ticketCount - 1));
System.out.println("I get a ticket!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// // 判断是否是自己加的锁,如果是则释放 缺点:非原子操作
// String LOCK_ID = stringRedisTemplate.opsForValue().get(LOCK);
// if (LOCK_ID != null && LOCK_ID.equals(VALUE)) {
// stringRedisTemplate.delete(LOCK);
// }
String unlockLuaScript =
"if redis.call('get',KEYS[1]) == ARGV[1] " +
"then redis.call('del',KEYS[1]); " +
"return true " +
"else return false " +
"end";
stringRedisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
return connection.eval(unlockLuaScript.getBytes(),
ReturnType.BOOLEAN,
1,
LOCK.getBytes(),
VALUE.getBytes()
);
}
});
}
} else {
System.out.println("Field");
}
}
}
针对第二种问题,我们可以利用看门狗机制实现:
@RestController
@RequestMapping("/sell")
public class AppController {
@Resource
StringRedisTemplate stringRedisTemplate;
String LOCK = "TICKETSELLER";
String KEY = "TICKET"; // 记得在 redis 里面设置好 TICKET 的数量
@GetMapping("/ticket")
public void sellTicket() {
String lockLuaScript =
"if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " +
"then redis.call('expire',KEYS[1],ARGV[2]) ;" +
"return true " +
"else return false " +
"end";
// 生产环境替换为 uuid + 线程 id
String VALUE = String.valueOf(Thread.currentThread().getId());
Boolean isLocked = stringRedisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
return connection.eval(lockLuaScript.getBytes(),
ReturnType.BOOLEAN,
1,
LOCK.getBytes(),
VALUE.getBytes(), // 用于判断是否为当前线程加的锁
"5".getBytes()
);
}
});
if (Boolean.TRUE.equals(isLocked)) {
// 判断是否是自己加的锁,如果是则续期
String addlockLuaScript =
"if redis.call('get',KEYS[1]) == ARGV[1] " +
"then redis.call('expire',KEYS[1], ARGV[2]) ; " +
"return true " +
"else return false " +
"end";
Thread watchDoge = new Thread(() -> {
while (Boolean.TRUE.equals(stringRedisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
return connection.eval(addlockLuaScript.getBytes(),
ReturnType.BOOLEAN,
1,
LOCK.getBytes(),
VALUE.getBytes(),
"5".getBytes());
}
})) && !Thread.currentThread().isInterrupted()) {
try {
System.out.println(Thread.currentThread().isInterrupted());
Thread.sleep(4000);
} catch (Exception e) {
break;
}
}
});
watchDoge.setDaemon(true);
watchDoge.start();
try {
int ticketCount = Integer.parseInt((String) stringRedisTemplate.opsForValue().get(KEY));
if (ticketCount > 0) {
stringRedisTemplate.opsForValue().set(KEY, String.valueOf(ticketCount - 1));
// Thread.sleep(10000000); // 在这里睡一下,可以到 redis 里面 TTL TICKETSELLER 查看锁是否被续期
watchDoge.interrupt();
System.out.println("I get a ticket!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
String unlockLuaScript =
"if redis.call('get',KEYS[1]) == ARGV[1] " +
"then redis.call('del',KEYS[1]); " +
"return true " +
"else return false " +
"end";
stringRedisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
return connection.eval(unlockLuaScript.getBytes(),
ReturnType.BOOLEAN,
1,
LOCK.getBytes(),
VALUE.getBytes()
);
}
});
}
} else {
System.out.println("Field");
}
}
}
对于上面的看门狗机制,其实是一个极其朴素的实现,实际上需要考虑到的东西还有很多。
另外上述的实现仍缺少一些高级应用场景的功能:
而这些功能想要自己去实现是非常麻烦的,因此一般利用 Redisson 实现分布式锁。
Redisson 内置了一系列的分布式对象,分式集合,分布式锁,分布式服务等诸多功能特性,是一款基于 Redis 实现,拥有一系列分布式 系统功能特性的工具包,是实现分布式系统架构中缓存中间件的最佳选择。
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.27.2</version>
</dependency>
编写 Redisson 配置类:
@Configuration
public class RedissonConfig {
// 构建 Redisson 客户端配置
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}
注入到 Controller:
@RestController
@RequestMapping("/redisson")
public class RedissonAppController {
String LOCK = "REIDSSON:TICKETSELLER";
String KEY = "TICKET";
// 注入
@Resource
RedissonClient redissonClient;
@Resource
StringRedisTemplate stringRedisTemplate;
@GetMapping("/sell/ticket")
public void redissonSellTicket() {
// 加锁
RLock rLock = redissonClient.getLock(LOCK);
rLock.lock();
try {
int count = Integer.parseInt((String) stringRedisTemplate.opsForValue().get(KEY));
if (count > 0) {
stringRedisTemplate.opsForValue().set(KEY, String.valueOf(count - 1));
System.out.println("Reidsson get ticket");
} else {
System.out.println("Field");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
rLock.unlock();
}
}
}
首先来看 package org.redisson
包下的 lock
方法的具体实现:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
/*
* 这里调用 tryAcquire 尝试获取锁
* 如果为 null 说明获取到了锁
* 如果不是 null 说明其他线程持有锁
* 这个方法最底层的实现其实也是 LUA 脚本
*/
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
// 发布订阅,非阻塞锁
CompletableFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = (RedissonLockEntry)this.commandExecutor.getInterrupted(future);
} else {
entry = (RedissonLockEntry)this.commandExecutor.get(future);
}
try {
while(true) {
// 仍然尝试获取锁
ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var14) {
if (interruptibly) {
throw var14;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
} finally {
// 取消订阅频道
this.unsubscribe(entry, threadId);
}
}
}
接下来我们看 tryAcquire
尝试获取锁的方法 tryAcquireOnceAsync
:
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture acquiredFuture;
if (leaseTime > 0L) {
acquiredFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
// 未设定过期时间走这个默认的
acquiredFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
CompletionStage<Boolean> acquiredFuture = this.handleNoSync(threadId, acquiredFuture);
CompletionStage<Boolean> f = acquiredFuture.thenApply((acquired) -> {
if (acquired) {
if (leaseTime > 0L) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
this.scheduleExpirationRenewal(threadId);
}
}
return acquired;
});
return new CompletableFutureWrapper(f);
}
可以看到 tryLockInnerAsync
传参中多了一个参数 this.internalLockLeaseTime
,这个东西的初始化在:
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
// 看门狗机制的续期时间
this.internalLockLeaseTime = this.getServiceManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
这个续期时间的默认值可以在 Config 里面找到:
public Config() {
this.transportMode = TransportMode.NIO;
// 默认是 30s
this.lockWatchdogTimeout = 30000L;
this.checkLockSyncedSlaves = true;
this.slavesSyncTimeout = 1000L;
this.reliableTopicWatchdogTimeout = TimeUnit.MINUTES.toMillis(10L);
this.keepPubSubOrder = true;
this.useScriptCache = false;
this.minCleanUpDelay = 5;
this.maxCleanUpDelay = 1800;
this.cleanUpKeysAmount = 100;
this.nettyHook = new DefaultNettyHook();
this.useThreadClassLoader = true;
this.addressResolverGroupFactory = new SequentialDnsAddressResolverFactory();
this.protocol = Protocol.RESP2;
}
最后我们回到最底层的 tryLockInnerAsync
方法:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteSyncedAsync(this.getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
这里实际执行的是一个 LUA 脚本:
-- 判断是否存在 KEY,不存在则加锁
if (redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 对 hash 的字段加一
redis.call('pexpire', KEYS[1], ARGV[1]); -- 设置过期时间
return nil;
else
return redis.call('pttl', KEYS[1]); -- 存在锁说明其他线程占有,返回过期时间
end;
这里新版的 Redisson 加锁的逻辑简化了,以前是区分了加锁和可重入,现在进行了合并。
继续看 tryAcquireAsync
方法,在获取到锁后,走 scheduleExpirationRenewal
的逻辑:
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
if (leaseTime > 0L) {
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> s = this.handleNoSync(threadId, ttlRemainingFuture);
RFuture<Long> ttlRemainingFuture = new CompletableFutureWrapper(s);
CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
if (ttlRemaining == null) {
if (leaseTime > 0L) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
this.scheduleExpirationRenewal(threadId); // 锁续期
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper(f);
}
点进去看 scheduleExpirationRenewal
方法:
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
this.renewExpiration(); // 创建定时任务
} finally {
if (Thread.currentThread().isInterrupted()) {
this.cancelExpirationRenewal(threadId, (Boolean)null);
}
}
}
}
创建定时任务的逻辑:
private void renewExpiration() {
ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.getServiceManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
RedissonBaseLock.log.error("Can't update lock {} expiration", RedissonBaseLock.this.getRawName(), e);
RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
} else {
if (res) {
RedissonBaseLock.this.renewExpiration();
} else {
RedissonBaseLock.this.cancelExpirationRenewal((Long)null, (Boolean)null);
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 这个值在之前讲过,续期用的,默认为 30s,因此这个任务默认每隔 10s 执行一次
ee.setTimeout(task);
}
}
每次定时任务触发,会执行 renewExpirationAsync
方法:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return this.evalWriteSyncedAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
可以看到,本质仍然是一个 LUA 脚本:
-- 检查 KEY 是否存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]); -- 存在则更新过期时间为 this.internalLockLeaseTime,就是默认那 30s
return 1; -- 成功返回
end;
return 0;
首先来看 package org.redisson
包下的 unlock
方法的具体实现:
public void unlock() {
try {
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException var2) {
if (var2.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)var2.getCause();
} else {
throw var2;
}
}
}
调用了方法 unlockAsync
:
public RFuture<Void> unlockAsync(long threadId) {
return this.getServiceManager().execute(() -> {
return this.unlockAsync0(threadId);
});
}
// 又调用了这段
private RFuture<Void> unlockAsync0(long threadId) {
CompletionStage<Boolean> future = this.unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((res, e) -> {
this.cancelExpirationRenewal(threadId, res);
if (e != null) {
if (e instanceof CompletionException) {
throw (CompletionException)e;
} else {
throw new CompletionException(e);
}
} else if (res == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
throw new CompletionException(cause);
} else {
return null;
}
});
return new CompletableFutureWrapper(f);
}
接下来走 unlockInnerAsync
的逻辑:
protected final RFuture<Boolean> unlockInnerAsync(long threadId) {
// 生成一个会话ID用于锁的释放
String id = this.getServiceManager().generateId();
// 获取Redisson的配置对象
MasterSlaveServersConfig config = this.getServiceManager().getConfig();
// 计算超时的时间,这是基于配置中的超时时间、重试间隔和重试次数计算出的总时间
int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts();
timeout = Math.max(timeout, 1);
// 执行异步任务以释放锁
RFuture<Boolean> r = this.unlockInnerAsync(threadId, id, timeout);
// 使用CompletionStage处理异步操作的结果
CompletionStage<Boolean> ff = r.thenApply((v) -> {
CommandAsyncExecutor ce = this.commandExecutor;
// 判断commandExecutor是否是CommandBatchService的一个实例
if (ce instanceof CommandBatchService) {
// 如果是,创建一个新的CommandBatchService实例
ce = new CommandBatchService(this.commandExecutor);
}
// 执行DEL命令以删除锁相关的数据
((CommandAsyncExecutor)ce).writeAsync(this.getRawName(),
LongCodec.INSTANCE,
RedisCommands.DEL,
new Object[]{this.getUnlockLatchName(id)});
// 如果为CommandBatchService实例,则执行异步批量提交操作
if (ce instanceof CommandBatchService) {
((CommandBatchService)ce).executeAsync();
}
// 返回之前的异步任务的结果
return v;
});
// 将CompletionStage的执行结果包装为RFuture返回
return new CompletableFutureWrapper(ff);
}
重点在于执行异步任务释放锁的过程:
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
return this.evalWriteSyncedAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local val = redis.call('get', KEYS[3]); if val ~= false then return tonumber(val);end; if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); redis.call('set', KEYS[3], 0, 'px', ARGV[5]); return 0; else redis.call('del', KEYS[1]); redis.call(ARGV[4], KEYS[2], ARGV[1]); redis.call('set', KEYS[3], 1, 'px', ARGV[5]); return 1; end; ", Arrays.asList(this.getRawName(), this.getChannelName(), this.getUnlockLatchName(requestId)), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId), this.getSubscribeService().getPublishCommand(), timeout});
}
可以看到,本质还是一个 LUA 脚本:
-- 尝试从KEYS[3]获取值
local val = redis.call('get', KEYS[3])
-- 如果val不为false,则返回它的数字表示
if val ~= false then
return tonumber(val)
end
-- 检查hash KEYS[1]中是否存在字段ARGV[3]
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
-- 如果不存在,则返回nil
return nil
end
-- 在hash KEYS[1]里将字段ARGV[3]的值减1,并将结果保存在counter中
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1)
-- 如果计数器仍然大于0
if (counter > 0) then
-- 设置hash KEYS[1]的过期时间为ARGV[2]毫秒
redis.call('pexpire', KEYS[1], ARGV[2])
-- 设置KEYS[3]的值为0,并设置过期时间为ARGV[5]毫秒
redis.call('set', KEYS[3], 0, 'px', ARGV[5])
return 0
else
-- 如果计数器不大于0,则删除hash KEYS[1]
redis.call('del', KEYS[1])
-- 执行ARGV[4](可能是发布某些消息)到KEYS[2]
redis.call(ARGV[4], KEYS[2], ARGV[1])
-- 设置KEYS[3]的值为1,并设置过期时间为ARGV[5]毫秒
redis.call('set', KEYS[3], 1, 'px', ARGV[5])
return 1
end
这里主要是很多这种的请求打过来,查到的 key 不存在的次数较多,导致数据库压力倍增
较为简单的解决方案是将这种查询不到的 key 设置为空值缓存并返回,缺点是占内存,实际上可以采用更加优雅的解决方案——添加布隆过滤器:
这里全部命中后的“可能”存在是因为存在哈希冲突的可能性:
技术上实现:
这里主要是针对某个 key 的,某个 key 恰好过期恰好大量请求打到这个 key 上
这里算是缓存击穿的升级版,大量的同一过期时间的 key 失效,结果大量请求过来,虽然查询的不是同一个 key,但未命中的流量占大部分