public static int count = 0;
private static int expireTime = 50;
private static String lockKey = "desc";
private static String lockValue = "lockValue";
public void decr() {
if (RedisPoolUtil.setNxPx(lockKey, lockValue, expireTime)) {
try {
//业务处理
} catch (Exception e) {
log.info("操作error:{}", e.getMessage(), e);
} finally {
RedisPoolUtil.del(lockKey);
}
}
}
使用redis分布式锁重要的3个点:①使用redis提供的原子命令,对应jedis给的api是 public String set(String key, String value, String nxxx, String expx, long time) ②设置合适的锁过期时间③ 在finally块释放锁。三点里最难的就是第二点了,里边有一个”合适“,多长时间算是合适呢?跟业务处理时间有很大的关系,一旦设置时间不当,就会出现问题:业务未处理完时,锁过期了怎么办?下面这段代码可以复现这个问题:
public void decr() {
if (RedisPoolUtil.setNxPx(lockKey, lockValue, expireTime)) {
try {
if (count < 10) {
//业务处理...
Thread.sleep(10);
count++;
}
//模拟意外延迟
if (System.currentTimeMillis() % 2 == 0) {
Thread.sleep(15);
}
} catch (Exception e) {
log.info("操作error:{}", e.getMessage(), e);
} finally {
RedisPoolUtil.del(lockKey);
}
}
}
public void result() {
log.info("拿到锁进行操作:{}",count);
}
使用多线程模拟并发,调用该方法,最终输出count的值
[main] INFO blog20201215.MyLock - 拿到锁进行操作:11
可以看到有count本应该最大增到10,但是因为锁过期,同时有不止一个线程进入了业务处理代码块,count增到了11,分布式锁失效。该情况会导致两个问题,一个是先进入代码块的线程A未完成业务操作,同时后来线程B进入,造成错误。二是后来的线程B未完成操作之前,先进入代码块的线程A会释放掉后进入的线程设置的分布式锁,导致恶性循环。
所以我们可以将过期时间设置的长一点,防止过期。但是多长算长呢?很难把控,于是我们可以在拿到分布式锁之后,开始业务操作之前,启动一个线程给拿到锁的该线程”续命“,也就是只要该线程未完成业务处理,子线程就去增加分布式锁的过期时间。demo如下:
public void decr1() {
if (RedisPoolUtil.setNxPx(lockKey, "1", expireTime)) {
Thread t = null;
try {
//开启续命线程
t = daemon(Thread.currentThread());
if (count < 10) {
//业务处理...
Thread.sleep(10);
count++;
}
//模拟意外延迟
if (System.currentTimeMillis() % 2 == 0) {
Thread.sleep(15);
}
} catch (Exception e) {
log.info("操作error:{}", e.getMessage(), e);
} finally {
if (Objects.nonNull(t)){
//关续命线程
t.interrupt();
}
RedisPoolUtil.del(lockKey);
}
}
}
private Thread daemon(Thread parent){
Thread t = new Thread(new Runnable() {
@Override
public void run() {
boolean end = false;
do {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
log.info("{}终止了自己的辅助线程",parent.getName());
end = true;
}
if (!end){
log.info("给{}续命",parent.getName());
RedisPoolUtil.setEx(lockKey,parent.getName(),1);
}
}while (parent.isAlive() && !end);
}
});
t.start();
return t;
}
有了续命线程,锁不会提前过期,这样也就不会有刚才提到的第二个问题了(A线程释放了B线程的分布式锁)。但是这样的操作添加了系统的复杂性,也消耗了更多的资源。如果不使用续命线程,第二个问题该怎么解决呢?
A线程释放了B线程的锁,下面这个demo进行了复现:
public void decrV2() {
if (RedisPoolUtil.setNxPx(lockKey, Thread.currentThread().getName(), expireTime)) {
try {
if (count < 10) {
//业务处理...
Thread.sleep(10);
count++;
}
//模拟意外延迟
if (System.currentTimeMillis() % 2 == 0) {
Thread.sleep(15);
}
} catch (Exception e) {
log.info("操作error:{}", e.getMessage(), e);
} finally {
String value = RedisPoolUtil.get(lockKey);
if (!Thread.currentThread().getName().equals(value)) {
log.info("线程{}释放了{}的锁",Thread.currentThread().getName(),value);
}
RedisPoolUtil.del(lockKey);
}
}
}
模拟并发调用,可以看到部分输出如下:
[pool-1-thread-17] INFO blog20201215.MyLock - 线程pool-1-thread-17释放了pool-1-thread-59的锁
[pool-1-thread-59] INFO blog20201215.MyLock - 线程pool-1-thread-59释放了pool-1-thread-81的锁
[pool-1-thread-20] INFO blog20201215.MyLock - 线程pool-1-thread-20释放了pool-1-thread-12的锁
[pool-1-thread-58] INFO blog20201215.MyLock - 线程pool-1-thread-58释放了pool-1-thread-12的锁
[pool-1-thread-40] INFO blog20201215.MyLock - 线程pool-1-thread-40释放了pool-1-thread-86的锁
[pool-1-thread-50] INFO blog20201215.MyLock - 线程pool-1-thread-50释放了pool-1-thread-68的锁
[pool-1-thread-81] INFO blog20201215.MyLock - 线程pool-1-thread-81释放了pool-1-thread-68的锁
[pool-1-thread-85] INFO blog20201215.MyLock - 线程pool-1-thread-85释放了pool-1-thread-32的锁
[pool-1-thread-72] INFO blog20201215.MyLock - 线程pool-1-thread-72释放了pool-1-thread-32的锁
[pool-1-thread-64] INFO blog20201215.MyLock - 线程pool-1-thread-64释放了pool-1-thread-32的锁
[pool-1-thread-86] INFO blog20201215.MyLock - 线程pool-1-thread-86释放了pool-1-thread-32的锁
[pool-1-thread-87] INFO blog20201215.MyLock - 线程pool-1-thread-87释放了pool-1-thread-17的锁
[main] INFO blog20201215.MyLock - 拿到锁进行操作:12
[pool-1-thread-73] INFO blog20201215.MyLock - 线程pool-1-thread-73释放了pool-1-thread-52的锁
[pool-1-thread-95] INFO blog20201215.MyLock - 线程pool-1-thread-95释放了pool-1-thread-52的锁
解决这个问题,可以规定:解锁只能上锁人。A线程上的锁,只能A线程来释放。只要我们将lockKey的value设置成与该线程相关的值就可以了,在释放锁时进行判断,不是自己的锁,就不释放。这样可以保证不会产生恶心循环。
public void decrV3() {
if (RedisPoolUtil.setNxPx(lockKey, Thread.currentThread().getName(), expireTime)) {
try {
if (count < 10) {
//业务处理...
Thread.sleep(10);
count++;
}
//模拟意外延迟
if (System.currentTimeMillis() % 2 == 0) {
Thread.sleep(15);
}
} catch (Exception e) {
log.info("操作error:{}", e.getMessage(), e);
} finally {
String value = RedisPoolUtil.get(lockKey);
if (!Thread.currentThread().getName().equals(value)) {
log.info("线程{}不可释放{}的锁",Thread.currentThread().getName(),value);
}else {
RedisPoolUtil.del(lockKey);
}
}
}
}
以上代码中释放锁的代码还是有些问题,因为其中涉及到了两个redis操作①获取当前锁的value ②比较之后,再执行del操作。可能会发生一种情况,当执行完第一步之后,第二步还没来得及执行,锁过期了,其他线程设置了自己的锁,于是再次出现线程A释放线程B的情况。我们可以使用Lua脚本将这两个操作合为一个原子操作,如下:
public static boolean releaseDistributedLock(String lockKey, String value) {
Jedis jedis = null;
Object result;
try{
jedis = RedisPool.getJedis();
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value));
}catch (Exception e) {
log.error("releaseDistributedLock error key:{},value:{}", lockKey, value, e);
RedisPool.returnBrokenResource(jedis);
return false;
}
RedisPool.returnResource(jedis);
if ("OK".equals(result)) {
return true;
}
return false;
}
解锁时调用以上这个方法就可以了
public void decrV4() {
if (RedisPoolUtil.setNxPx(lockKey, Thread.currentThread().getName(), expireTime)) {
try {
if (count < 10) {
//业务处理...
Thread.sleep(10);
count++;
}
//模拟意外延迟
if (System.currentTimeMillis() % 2 == 0) {
Thread.sleep(15);
}
} catch (Exception e) {
log.info("操作error:{}", e.getMessage(), e);
} finally {
if (!RedisPoolUtil.releaseDistributedLock(lockKey,Thread.currentThread().getName())) {
log.info("线程{}不可释放{}的锁",Thread.currentThread().getName(), RedisPoolUtil.get(lockKey));
}
}
}
}
再次模拟并发调用,输出如下:
[pool-1-thread-24] INFO blog20201215.MyLock - 线程pool-1-thread-24不可释放pool-1-thread-42的锁
[pool-1-thread-42] INFO blog20201215.MyLock - 线程pool-1-thread-42不可释放pool-1-thread-96的锁
[pool-1-thread-27] INFO blog20201215.MyLock - 线程pool-1-thread-27不可释放pool-1-thread-96的锁
[pool-1-thread-96] INFO blog20201215.MyLock - 线程pool-1-thread-96不可释放pool-1-thread-77的锁
[pool-1-thread-77] INFO blog20201215.MyLock - 线程pool-1-thread-77不可释放pool-1-thread-100的锁
[pool-1-thread-46] INFO blog20201215.MyLock - 线程pool-1-thread-46不可释放pool-1-thread-17的锁
[pool-1-thread-100] INFO blog20201215.MyLock - 线程pool-1-thread-100不可释放pool-1-thread-88的锁
[pool-1-thread-17] INFO blog20201215.MyLock - 线程pool-1-thread-17不可释放pool-1-thread-79的锁
[pool-1-thread-79] INFO blog20201215.MyLock - 线程pool-1-thread-79不可释放pool-1-thread-15的锁
[pool-1-thread-88] INFO blog20201215.MyLock - 线程pool-1-thread-88不可释放pool-1-thread-15的锁
[main] INFO blog20201215.MyLock - 拿到锁进行操作:10
[pool-1-thread-59] INFO blog20201215.MyLock - 线程pool-1-thread-59不可释放pool-1-thread-87的锁
[pool-1-thread-87] INFO blog20201215.MyLock - 线程pool-1-thread-87不可释放null的锁
可以看到,没出现什么问题,但是锁提前释放的问题任然存在。(当然分布式锁应用于分布式环境,目前我是单机测试,所以value采用了线程名称,更严谨一些应该生成唯一id,作为锁的value)
一些业务场景可能会需要分布式锁具有可重入性,可以使用通过ThreadLocal记录当前线程获取到的锁,记录获取次数,获取锁时先检查当前线程是否已经获取到了锁,如果没有,则通过redis去获取锁,拿到之后在本线程记录已获取到锁,次数为1;第二次获取锁时,如果发现已经获取到了锁,直接给获取次数加1就可以,同理在释放锁时也需要将次数标识进行递减,从而实现一个可重入锁。
生产环境,redis一般都是一个集群,有多台机器,当记录我们加锁的那台redis挂了怎么办?这种情况会导致,其他节点加锁成功,分布式锁失败。我想到两种方案:①redis集群采取主从架构,比如三主三从,当主节点挂了,从节点可以顶上,从而保证业务正常,但也有可能从节点没有将主节点的锁信息同步过去,这种情况只能采取第二种方法了。②加锁是采取RedLock的思路,加锁时,给集群中(n/2+1)个节点加锁成功才算获取锁成功,解锁时给redis集群中的所有机器解锁。