*分布式锁,是分布式应用中不可获缺的一个工具。
*典型的微服务架构中,在进行某些重要业务的时候,需要在整个微服务应用中对业务进行上锁。
*除此之外,即使是简单的单机项目,也有可能会同一个项目进行多部署,采用Apache或Nginx实现负债均衡,
在这种场景下,对互斥的业务操作也需要进行上锁处理。
● 一般地、在外面实现分布式锁用的比较多的是Zookeeper和Redis。 ● Spring Integration不需要你去关注它到底是基于什么存储技术实现的,它是面向接口编程,低耦合让你不需要关注底层实现。你要做的仅仅是做简单的选择,然后用相同的一套api即可完成分布式锁的操作。
1、已实现可重入、解决了死锁问题 可重入:同一个线程,可以多次获得相同的锁。这个应该是实现锁都应该去实现的特性。否则你的锁,很有可能自己把自己搞死了。
死锁问题:如果一个线程在竞争锁成功后,意外宕机了,导致没有主动去释放锁。那么锁在一般情况下,就会永久保留,这就造成了死锁。需要人工去处理,一般的,类似于使用Redis作为实现工具的,出现死锁的时候,就要手动去Redis里面找到这一个锁然后del 掉它。
2、缺点,无法续期锁 为了解决死锁问题,在redis作为实现工具的情况下,默认是采用redis的TTL设置过期事件来解决死锁问题。默认是60s,如果你加锁之后的业务操作,大于60秒,就会导致锁自动释放,其他线程此时可以竞争获得你的锁。但是实际上,你本应该还持有锁。
该框架没有锁续期,或者自定义锁过期时间的API,因此要非常注意你加锁的业务功能,务必要在60s内完成。
一般地、在其他大牛实现分布式锁时,会有另外一个线程持续监控获得锁的线程,如果线程没有主动释放锁,而又处于活跃状态(即还在处理业务),那么另外一个线程会帮助这个锁进行续期,以保证锁不会因为超时而自动释放。
=========================================================================================================
啥也不说,直接开干。
项目基于Maven+SpringBoot , 分布式锁的实现采用的是Redis 。(因此请为SpringBoot整合好Redis)
在需要使用锁的Bean里面 注入依赖
官方源码位置: https://github.com/spring-projects/spring-integration
Good Lucky!
在阅读了它的源码后,本人觉得有一些很值得学习的思想
从上文图片可知,RedisLockRegistry是通过new RedisLockRegistry(redisConnectionFactory, “redis-lock-test”);
传入的参数包括:
每个redisLockRegistry对象内部会维护一个线程安全的Map,即上面代码中第三行的locks。它的作用是用于保存名为lockKey对应的RedisLock对象。
可能会有人问computeIfAbsent以及RedisLock::new是什么来的
通过上面代码,你就已经获得了一个名为lockKey的Lock对象。下面进入重点环节
首先无论是lock还是trylock方法,他们只有无限阻塞和尝试一段时间竞争锁的区别。他们的工作核心流程都是:先竞争ReentrantLock,成功后再调用obtainLock()进行Redis端的锁竞争。 两步依次都成功后,才会返回true,表明你本次竞争锁成功。
@Override
public void lock() {
// 第一步,先进行ReentrantLock竞争,它的目的是在当前客户端中,不同线程之间先竞争一轮,决出最终竞争成功的那个线程
// 同时这ReentrantLock默认是nonFair非公平锁
this.localLock.lock();
// 运行到这里表明,当前线程已经在本客户端中竞争成功,但并不意味着,你的分布式锁就能成功
// 此时,你将代表当前客户端clientId,去Redis端进行竞争
while (true) {
try {
// 调用obtainLock()去redis端进行竞争,直到竞争成功
while (!obtainLock()) {
Thread.sleep(100); // 本次竞争失败,等待100ms再去尝试
}
// 执行到这里,表明Redis端也竞争成功了,此刻,你才是真正的分布式锁竞争成功!
break;
}
catch (InterruptedException e) {
}
catch (Exception e) {
// 注意如果在这里try catch出现任何异常,我们都需要把当前客户端的ReentrantLock进行unlock释放,防止死锁
this.localLock.unlock();
rethrowAsLockException(e);
}
}
}
/**
* 线程在自己的客户端中竞争成功后,代表当前客户端去Redis端进行分布式锁的竞争
*/
private boolean obtainLock() {
// 操作redis-lua的方法
Boolean success =
RedisLockRegistry.this.redisTemplate.execute(
/* lua脚本 */RedisLockRegistry.this.obtainLockScript,
/* keys */Collections.singletonList(this.lockKey),
/* argv[1] */RedisLockRegistry.this.clientId,
/* argv[2] */String.valueOf(RedisLockRegistry.this.expireAfter));
/*
lua脚本执行后,会返回true or false
true:分布式锁竞争成功
false:分布式锁竞争失败
*/
boolean result = Boolean.TRUE.equals(success);
if (result) {
// 如果true,那么就记录此刻的时间
this.lockedAt = System.currentTimeMillis();
}
return result;
}
/**
* KEYS[1] : lockKey 锁名
* ARGV[1] : clientId 客户端id
* ARGV[2] : expireAfter 过期时间毫秒级
*/
// 调用Redis GET命令,获取lockKey对应的value值
local lockClientId = redis.call('GET', KEYS[1])
if lockClientId == ARGV[1] then
// 如果lockClientId不为空,且value值等于传入的clientId,说明它就是这个锁的锁主
// 出现这种情况,说明这是第N(N>1)次重入
// 在Redis端会为这次重入,重置KV的TTL
redis.call('PEXPIRE', KEYS[1], ARGV[2]) // 调用PEXPIRE为 lockkey设置过期时间(毫秒)
return true
elseif not lockClientId then
// 如果lockClientId为空,则会进入当前elseif
// 出现这种情况,说明没有其他客户端持有该锁,所以该value才会为nil(空)
// 表明你这次竞争锁成功
redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])
return true
end
// 如果lockClientId不为空,又不等于当前clientId
// 那么就是竞争失败
return false
从上面lock()方法的源码可知,解决可重入问题是通过ReentrantLock来辅助实现的。而解决死锁问题则是通过Redis的TTL实现。它的工作思想比较巧妙,总结为以下一张图:
在这里,简单的说: 假设有三所学校A,B,C。每所学校有3个教师A1,A2,A3,B1,B2,B3,C1,C2,C3。 一共9个老师去教育局请教育局长来学校调研。 在这里,教育局局长就是共享资源,它每次肯定只能去一所学校参观 首先,每所学校的3名教师会先进行内部竞争,决出一名教师代表自己的学校去教育局。最终每所学校的那名教师代表自己的学校,到达教育局,教育局局长的接待工作由秘书负责,秘书按照先后顺序接见A,B,C三所学校的代表教师。如果教育局局长有空,则由最先到的教师带走教育局局长去它的学校调研。调研结束后,教育局局长返回教育局,再由第二所学校的教师带走教育局局长。
我想这样解释就很生动的模拟了上面分布式锁的竞争过程。 ⭐思考:为什么不能是9个教师直接到教育局进行先后竞争呢? 回答:
⭐回归整体的思考:为什么不能是9个线程,直接到Redis端进行竞争呢? 个人分析: 首先,如果你有了解过Redis实现的分布式锁,你可以从百度上看到很多别人的文章。最简单也挺有效的一种方式,就是利用Redis的setnx命令以及Redis本身单线程串行处理所有命令的特性,来实现一个可用的分布式锁。他们这些锁都有一个特点,就是每个线程为一个个体,到达Redis进行竞争。
之所以这里,作者要这样设计,我想应该出于以下几点优化:
1. 网络开销:同一个应用程序3个线程,就需要发送3条命令到Redis,并且有其中2条命令是肯定会失败的 2. 线程自旋开销:如果竞争失败,像lock的逻辑就是不断去重试直到成功,那么每次重试都需要发送一次Redis命令,每次都是网络开销。但是作者现在是,先是内部JVM层面的竞争,竞争成功后就会由3个线程变为1个线程去进行会消耗网络的自旋。而另外2个线程则只是消耗CPU的自旋。倘若是3个线程都去redis进行竞争,那么就是3个CPU自旋+3个网络消耗。而现在只是3个CPU自旋+1个网络消耗 3. Redis性能:Redis肯定不仅仅是为了解决分布式锁而存在的,它的功能有很多。9个线程去让redis进行工作,和3个线程去让redis进行工作,对redis的性能消耗肯定是不同的。(当然这里3和9肯定可以忽略不计了,但是毕竟这里简单举例子,放大十倍,百倍,千倍就是一笔大的开销) 4.为了实现可重入:我想这个才是这项设计比较核心的考虑。对比网上没有可重入功能的redis分布式锁,可以看到都是没有ReentrantLock的辅助的。但是我们可知,可重入性几乎是锁必备的特性,而ReentrantLock是Java实现好的一款极具生产价值的可重入锁。因此作者为了利用ReentrantLock实现可重入性,而由此衍生出这样的设计考虑。
(当然上面都是个人分析罢了,实现可重入应该还有很多方式,不过在看了作者的源码后,感觉这是一个非常不错的考虑)
@Override
public void unlock() {
if (!this.localLock.isHeldByCurrentThread()) {
// 先判断当前ReentrantLock的锁主是不是当前线程
throw new IllegalStateException("You do not own lock at " + this.lockKey);
}
if (this.localLock.getHoldCount() > 1) {
// 判断可重入标记,如果大于1,说明重入了getholdCount()次,这一次unlock()只是让计数-1,而不会真正释放Redis端的分布式锁
this.localLock.unlock();
return;
}
try {
if (!isAcquiredInThisProcess()) {
// 这个方法封装了redis端的判断,它会判断Redis端的锁是不是你持有的
// 一般情况下,这个方法都会返回true,则跳过这次报错
// 如果代码进入此报错,原因主要是,每个锁的过期时间默认60s,如果你持有锁的情况下超过60s后再unlock(),
// 此时锁早就已经过期丢弃,甚至被其他线程竞争掉,所以你的unlcok会失败
throw new IllegalStateException("Lock was released in the store due to expiration. " +
"The integrity of data protected by this lock may have been compromised.");
}
// 调用redis del命令删除KV。也就是释放这次分布式锁
if (Thread.currentThread().isInterrupted()) {
RedisLockRegistry.this.executor.execute(() ->
RedisLockRegistry.this.redisTemplate.delete(this.lockKey));
}
else {
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
if (logger.isDebugEnabled()) {
logger.debug("Released lock; " + this);
}
}
catch (Exception e) {
ReflectionUtils.rethrowRuntimeException(e);
}
finally {
// 最后记得把本地的ReentrantLock进行unlock(),以让其他等待线程进行竞争
this.localLock.unlock();
}
}