本期我们继续探索 Redisson 的分布式同步组件。本篇聚焦在 Semaphore(信号量) 和 CountDownLatch(倒数闩),看看它们在单机环境中常用的同步思想是如何“穿越”到分布式场景,并依赖 Redis 来保证一致性的。
在普通的 Java 线程编程中,Semaphore 常用于控制对特定资源的同时访问数,而 CountDownLatch 则常用于等待其他线程把某些工作执行完毕后再继续。但若应用部署在分布式环境,单机版本的对象自然约束在某一 JVM 中,无法跨节点协同控制。此时我们可以借助 Redisson 提供的 RSemaphore 和 RCountDownLatch,让这两种同步模式在分布式下也能“有法可依”。
可以限制同时访问共享资源(如数据库连接、缓存访问、某段关键逻辑等)的并发数量。
1.可动态调整可用许可数量;
2.在分布式集群环境下,任何一个节点都能正确感知许可的剩余数量。
类似“倒计时”,需要等待某些操作完成后才能继续。例如:只有当分布式系统中多个子任务都执行完成后,主流程才能继续执行。
1.可设置初始倒计数值,不同节点根据执行进度减少计数。
2.当计数归零时,就会唤醒等待该闩的节点。
接下来,让我们直击它们在源码中的实现方式。
Redisson 提供的分布式信号量接口 RSemaphore
在底层主要依赖 Redis 的 Lua 脚本 和 Redis key 自增/自减 这类能力:
acquire
) :
Lua 脚本会检查是否存在足够的许可值(通常是 Redis 中的某个 key 记录当前可用许可数);
如果足够,则进行减操作;否则阻塞或返回失败。release
) :
将 Redis 中存储的许可数 +1,并通知阻塞挂起中的线程(如果有)。RedissonSemaphore
源码示意,整体只展示关键思路:
public class RedissonSemaphore extends RedissonBaseDistributedObject implements RSemaphore {
public RedissonSemaphore(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
@Override
public void acquire() throws InterruptedException {
tryAcquire(-1, TimeUnit.MILLISECONDS);
}
@Override
public boolean tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException {
return get(tryAcquireAsync(waitTime, unit, 1));
}
@Override
public void release() {
get(releaseAsync(1));
}
// ...
}
RedissonSemaphore
依托一些异步方法,例如 tryAcquireAsync
、releaseAsync
等,这些方法内部最终会组装 Lua 脚本并发送到 Redis,来做自增自减和阻塞通知。
在 tryAcquireAsync
中,一般会:
key = key - 1
;tryAcquire
的超时时间和模式)。private RFuture<Boolean> tryAcquireAsync(long waitTime, TimeUnit unit, int permits) {
// Lua脚本核心:判断key是否 >= permits,否则阻塞/返回
// 执行成功后减去相应数量
return commandExecutor.evalWriteAsync(
getName(),
RedisCommands.EVAL_BOOLEAN,
"local currVal = redis.call('get', KEYS[1]); " +
"if (currVal ~= false and tonumber(currVal) >= tonumber(ARGV[1])) then " +
"redis.call('decrby', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
permits);
}
此处脚本只是示意,真实的 Redisson 源码会更全面,比如要处理阻塞和等待通知的逻辑。
与获取许可对立,release
时关键就是将 Redis 中存储的许可数 +1,或一次性释放多个许可:
private RFuture<Long> releaseAsync(int permits) {
// Lua脚本:对key执行incr操作
return commandExecutor.evalWriteAsync(
getName(),
RedisCommands.EVAL_LONG,
"return redis.call('incrby', KEYS[1], ARGV[1]);",
Collections.<Object>singletonList(getName()),
permits);
}
当可用许可数从 0 变为正数,会通知等待获取许可的线程。
Redisson 的 RCountDownLatch
和 Semaphore 类似,同样通过 Redis 来维护一个“倒计数值”。当这个值大于 0 时,需要阻塞调用 await()
的线程;当计数归零后,所有阻塞线程会被唤醒。
源码示意:
public class RedissonCountDownLatch extends RedissonBaseDistributedObject implements RCountDownLatch {
public RedissonCountDownLatch(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
@Override
public void await() throws InterruptedException {
get(awaitAsync());
}
@Override
public void countDown() {
get(countDownAsync());
}
@Override
public boolean trySetCount(long count) {
return get(trySetCountAsync(count));
}
// ...
}
我们可以看到几个核心方法:
trySetCount
:初始化或重置倒计数值。countDown
:使计数值 -1。await
:阻塞等待,直到计数值清零。trySetCount
负责在 Redis 中设置 key 的初始计数值:
private RFuture<Boolean> trySetCountAsync(long count) {
// 如果key不存在,则设置它为 count
// 如果已经存在,则返回false 表示无法再次设置
return commandExecutor.evalWriteAsync(
getName(),
RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[1]) == 0 then " +
"redis.call('set', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
count
);
}
若 key 已存在,这次设置就会失败,说明已经有倒计数在进行中。
countDown
的 Lua 脚本则做 decrement 操作。如果计数归零,就发布一个消息或事件,唤醒所有 await()
中阻塞的线程(它们往往在 Redis 中通过订阅频道或等待队列的方式实现阻塞唤醒)。
private RFuture<Long> countDownAsync() {
return commandExecutor.evalWriteAsync(
getName(),
RedisCommands.EVAL_LONG,
"local val = redis.call('decr', KEYS[1]); " +
// 如果归零,发布消息到特定 channel,唤醒等待方
"if val <= 0 then " +
"redis.call('publish', KEYS[2], '0'); " +
"end;" +
"return val;",
Arrays.asList(getName(), getChannelName(getName()))
);
}
在真实代码中,getChannelName
通常会加工出一个类似 redisson_countdownlatch:{...}
的频道名称,用于发布/订阅机制来进行分布式唤醒。
await
在分布式场景下,会订阅 Redis 的特定频道。当倒计数归零时,服务器端执行 PUBLISH
,客户端这边接收到消息后就会解除阻塞,完成 CountDownLatch 的分布式同步。
与之前说到的分布式读写锁类似,Semaphore 与 CountDownLatch 的实现同样依赖 Redis 原子操作 + Lua 脚本:
INCRBY
、DECRBY
、PUBLISH
等,都能够在服务器端一次性完成。publish
到对应的 channel,唤醒在不同节点上阻塞的线程。利用这些特性,Redisson 让一台 JVM 中的 tryAcquire()
、countDown()
等操作,可以对远程 Redis 的数据产生影响,并同步反馈给其他 JVM 中等待的线程,实现分布式级别的互联和协作。
Semaphore 与 CountDownLatch 是高并发编程中两种常见的“并发工具类”,Redisson 在其基础上通过以下手段将它们延伸到分布式维度:
这样,我们就能在分布式环境中“像用本地工具类一样”去使用信号量或倒数闩。通过阅读源码,我们可以更好地理解其内部原理,从而在分布式系统设计和排错中游刃有余。希望本文能带给你新的收获,下一期再见!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。