前言
在单体架构中,我们处理并发的手段有多种,例如synchronized或使用ReentrantLock等常用手段,但是在分布式架构中,上述所说的就不能解决某些业务的并发问题了,那么接下来我们就开始聊聊分布式锁。
什么是分布式锁
在介绍分布式锁之前,我们先由浅入深了了解一下,线程锁和进程锁。
线程锁:
主要用来给方法、代码块加锁。当某个方法或代码在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比如Synchronized、Lock等。
进程锁:
为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。
分布式锁:
分布式锁,是指在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问。
分布式锁的几种实现方式
为啥要使用分布式锁
在聊聊为啥要使用分布式锁之前,我们先看一下如下图架构模式
上图,是一个集群单体架构模式,我们现在来设想秒杀一个业务场景,首先我们秒杀肯定得有商品,那么我们的商品库存数量是预存放到redis中,然后用户请求的时候,会经过nginx的负载均衡轮询之后,将请求落在其中某一台服务器上,然后执行用户抢购商品业务逻辑,我这里写一段简单演示代码
@Overridepublic String panicBuying() {
Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order")); if (orderCount > 0) { int retCount = orderCount - 1; redisTemplate.opsForValue().set("sunny_order", retCount + ""); System.out.println("库存扣减成功,剩余库存:" + retCount); } else { System.out.println("库存不存"); return "库存不足"; }
return "抢购成功";}
我们分析如上代码,和上图的架构模式,在高并发场景下会不会存在问题?
答案肯定是会的,我们现在是集群单体架构,在高并发场景下,通过ngxin负载轮询,当然可能会有多个请求同时进入方法,然后同一时刻调用redis的api获取库存的值,那么这个时候多个用户获取的库存数量是一样的,然后减掉库存,在设置进去数据肯定就不对了。
好,我们既然知道有bug了,那我们是不是可以使用synchronized锁,在同一时刻只让抢到锁的用户才能操作库存呢,见如下演示代码
@Overridepublic String panicBuying() {
synchronized (this) { Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order")); if (orderCount > 0) { int retCount = orderCount - 1; redisTemplate.opsForValue().set("sunny_order", retCount + ""); System.out.println("库存扣减成功,剩余库存:" + retCount); } else { System.out.println("库存不存"); return "库存不足"; } }
return "抢购成功";}
看到如上代码,估计有小伙伴已经看出来有问题了,那我再接着分析下,为什么如上代码加上了锁还是会有问题?
我们结合上面的图仔细想想,synchronized锁只是针对单台的jvm请求有效,但是集群环境下,通过nginx轮询转发,且高并发情况下,肯定会存在多个请求同一时刻将请求分配到两台服务器上,那这个时候就算有synchronized锁,也只能各自锁住各自服务器的jvm请求实例,还是会出现请求获取同样的库存数量,导致数据不对,不过也是稍微解决了大量请求进来的情况
既然我们知道synchronized锁已经无法解决我们的问题了,那我们可以使用redis分布式锁解决呀,在演示如何使用redis解决之前,我先介绍一下redis的setnx命令
对于redis的set命令,相信小伙伴是更加熟悉不过了,那么setNx命令其实跟set命令差不多,他们的区别在于:
我们了解完setNx命令,再来看一下通过setNx实现分布式锁的演示代码
@Overridepublic String panicBuying() {
//获取锁 Boolean lockResult = redisTemplate.opsForValue().setIfAbsent("sunny_lock", "sunny");
if (!lockResult) { return "锁已被占用,请稍后重试"; }
Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order")); if (orderCount > 0) { int retCount = orderCount - 1; redisTemplate.opsForValue().set("sunny_order", retCount + ""); System.out.println("库存扣减成功,剩余库存:" + retCount); } else { System.out.println("库存不存"); return "库存不足"; }
//释放锁 redisTemplate.delete("sunny_lock");
return "抢购成功";}
我们现在使用了Redis分布式锁,多个jvm实例同时setnx,只有一个jvm能够成功,那么就解决了集群下多台服务器锁资源的问题了,就意味着不会出现上面那种多个请求进来,同时减掉我们的库存操作了,那我们在看看还不会有问题呢?
答案是有的,假如我获取到锁之后,在执行业务逻辑的时候发生了报错,导致我无法进行释放锁,那后面的用户就永远无法再次继续抢购商品了对吧。
针对这么个情况,我猜有小伙伴已经想到了解决方案,那我们直接异常捕获下嘛,然后通过finally无论是否报错,都进行执行释放锁操作,看如下演示代码
@Overridepublic String panicBuying() { //获取锁 Boolean lockResult = redisTemplate.opsForValue().setIfAbsent("sunny_lock", "sunny");
if (!lockResult) { return "锁已被占用,请稍后重试"; }
try { Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order")); if (orderCount > 0) { int retCount = orderCount - 1; redisTemplate.opsForValue().set("sunny_order", retCount + ""); System.out.println("库存扣减成功,剩余库存:" + retCount); } else { System.out.println("库存不存"); return "库存不足"; } } catch (Exception e) { e.printStackTrace(); } finally { //释放锁 redisTemplate.delete("sunny_lock"); }
return "抢购成功";}
机智如我想到了这种办法解决如上这个问题,而此刻有小伙伴要说,你这样处理固然是可以的,但是有没有想过你获取到锁后,然后在执行业务逻辑过程中,服务器发生了宕机了,你的锁又无法释放掉了,那可咋办?估计此时心中一万个emmmm路过吧,不过没办法,既然出现问题我们总得解决是吧。
哼哼,既然你服务器无情且别怪我黎明大大不义了,那我直接给我的锁设置一个过期时间,看你服务器宕机还能不能阻止我释放掉锁,看如下演示代码
@Overridepublic String panicBuying() { //获取锁 Boolean lockResult = redisTemplate.opsForValue().setIfAbsent("sunny_lock", "sunny", 10, TimeUnit.SECONDS);
if (!lockResult) { return "锁已被占用,请稍后重试"; }
try { Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order")); if (orderCount > 0) { int retCount = orderCount - 1; redisTemplate.opsForValue().set("sunny_order", retCount + ""); System.out.println("库存扣减成功,剩余库存:" + retCount); } else { System.out.println("库存不存"); return "库存不足"; } } catch (Exception e) { e.printStackTrace(); } finally { //释放锁 redisTemplate.delete("sunny_lock"); }
return "抢购成功";}
好啦,我已经将我的锁释的过期时间设置为10秒自动过期了,就算你服务器宕机了我也不怕了,那么此时我们的程序还有bug么?
黎明大大告诉你,还是有的,为啥这么说呢?假如说我第一个请求进来,拿到了锁,本来我执行该业务在5秒内就能执行完,但是莫名奇妙要花15秒时间才能执行完,那我设置的锁自动失效时间,就会将该锁释放掉,第二个请求进来拿到了锁,然后执行业务逻辑,但是还没有执行完,第一个请求执行完业务逻辑了,把锁给释放掉了,那我第二个请求拿到的锁,被第一个请求给释放掉了,接着第三个请求又进来了,执行业务操作还没执行完,第二个请求执行完逻辑,又把锁释放掉了,然后形成了一个闭环操作,在高并发场景下,可能会导致锁长久失效的问题,不知道小伙伴能否get到我说的这个点?如果还没get到点的小伙伴,重复多看几遍就能理解了。
这个时候,黎明大大灵机一动,那很简单啊,我给拿到的锁设置的一个特定的clientId或者随机的值也行,然后在释放锁的时候,获取锁的value,判断一下value是否是我设置的value,如果是的话才
能释放锁,看代码演示(A线程创建锁 被B线程释放掉了,所以这里是解决谁创建的锁,就应该被谁给释放掉)
@Overridepublic String panicBuying() {
String uuid = UUID.randomUUID().toString();
//获取锁 Boolean lockResult = redisTemplate.opsForValue().setIfAbsent("sunny_lock", uuid, 10, TimeUnit.SECONDS);
if (!lockResult) { return "锁已被占用,请稍后重试"; }
try { Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order")); if (orderCount > 0) { int retCount = orderCount - 1; redisTemplate.opsForValue().set("sunny_order", retCount + ""); System.out.println("库存扣减成功,剩余库存:" + retCount); } else { System.out.println("库存不存"); return "库存不足"; } } catch (Exception e) { e.printStackTrace(); } finally { String uidValue = redisTemplate.opsForValue().get("sunny_lock"); if (uidValue.equals(uuid)) { //释放锁 redisTemplate.delete("sunny_lock"); } }
return "抢购成功";}
额....不过以上解决方案貌似并没有解决我们锁提前过期的问题哦,没关系,黎明大大还有一个思路就是当我们请求进入方法拿到了锁之后,我们此时再额外开一个分线程,然后在这个分线程里面写一个逻辑,该逻辑就是整一个自旋锁,然后在起个定时任务每隔几秒中去获取该锁是否还存在,如果存在则对该锁的过期时间进行续命,也就是加锁的过期时间啦,不过给锁添加过期时间是有讲究的哦,一般情况下是 锁的过期时间 / 3 = 锁续命的时间那么这样就能够解决锁提前失效的问题啦,看代码演示
@Overridepublic String panicBuying() { String uuid = UUID.randomUUID().toString(); String lockName = "sunny_lock";
boolean lock = lock(lockName,uuid); if (!lock) { return "抢占锁失败"; }
try { Thread.sleep(15000); Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order")); if (orderCount > 0) { int retCount = orderCount - 1; redisTemplate.opsForValue().set("sunny_order", retCount + ""); System.out.println("库存扣减成功,剩余库存:" + retCount); } else { System.out.println("库存不存"); return "库存不足"; } } catch (Exception e) { e.printStackTrace(); } finally { String uidValue = redisTemplate.opsForValue().get("sunny_lock"); if (uidValue.equals(uuid)) { //释放锁 System.out.println("释放锁"); redisTemplate.delete("sunny_lock"); } }
return "抢购成功";}
//获取锁private boolean lock(String lockKey,String uuid) { while (true) { //获取锁 Boolean lockResult = redisTemplate.opsForValue().setIfAbsent("sunny_lock", uuid, 10, TimeUnit.SECONDS);
if (!lockResult) { return false; }
Thread thread = new Thread(new Runnable() { @Override public void run() { System.out.println("执行~"); Timer timer = new Timer();
TimerTask timerTask = new TimerTask() { @Override public void run() { //锁存在则将生存时间重置为10s String o = (String) redisTemplate.opsForValue().get(lockKey); if (uuid.equals(o)) { //重新设置时间为10秒 redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); } else { timer.cancel(); } } }; //定时器启动3s后执行第一次,之后每隔3s执行一次 timer.schedule(timerTask, 3000L, 3000L); } });
thread.run(); break; } return true;}
解决了以上这些问题,其实在我们平常开发中,完全没有必要再写的这么麻烦了,因为有现成的框架已经帮你集成好这些代码了,甚至还会比我们写的更加严谨,比如redisson框架,该框架我不做多解释了,有不了解的可以自己百度搜寻类似的文章,我这里简单演示一下redisson如何获取锁和释放锁的
@Overridepublic String panicBuying() { String lockName = "sunny_lock";
RLock lock = redisson.getLock(lockName); lock.lock();
try { Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order")); if (orderCount > 0) { int retCount = orderCount - 1; redisTemplate.opsForValue().set("sunny_order", retCount + ""); System.out.println("库存扣减成功,剩余库存:" + retCount); } else { System.out.println("库存不存"); return "库存不足"; } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); }
return "抢购成功";}
好啦,redis实现分布式的锁坑基本上都已经踩完了。
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。