在单体项目中jvm中的锁即可完成需要,但是微服务、分布式环境下,同一个服务可能部署在多台服务器上,多个jvm之间无法通过常用的jvm锁来完成同步操作,需要借用分布式锁来完成上锁、释放锁。例如在订单服务中,我们需要根据日期来生成订单号流水,就有可能产生相同的时间日期,从而出现重复订单号。(jdk8使用LocalDateTime线程安全,不会存在这样的问题)
第一种方案实现较为简单,逻辑就是谁创建成功该节点,谁就持有锁,创建失败的自己进行阻塞,A线程先持有锁,B线程获取失败就会阻塞,同时对/lockPath设置监听,A线程执行完操作后删除节点,触发监听器,B线程此时解除阻塞,重新去获取锁。
我们模仿原生jdk的lock接口设计,采用模板方法设计模式来编写分布式锁,这样的好处是扩展性强,我们可以快速切换到redis分布式锁、数据库分布式锁等实现方式。
创建Lock接口
public interface Lock {
/**
* 获取锁
*/
void getLock() throws Exception;
/**
* 释放锁
*/
void unlock() throws Exception;
}
AbstractTemplateLock抽象类
public abstract class AbstractTemplateLock implements Lock {
@Override
public void getLock() {
if (tryLock()) {
System.out.println(Thread.currentThread().getName() + "获取锁成功");
} else {
//等待
waitLock();//事件监听 如果节点被删除则可以重新获取
//重新获取
getLock();
}
}
protected abstract void waitLock();
protected abstract boolean tryLock();
protected abstract void releaseLock();
@Override
public void unlock() {
releaseLock();
}
}
zookeeper分布式锁逻辑
@Slf4j
public class ZkTemplateLock extends AbstractTemplateLock {
private static final String zkServers = "127.0.0.1:2181";
private static final int sessionTimeout = 8000;
private static final int connectionTimeout = 5000;
private static final String lockPath = "/lockPath";
private ZkClient client;
public ZkTemplateLock() {
client = new ZkClient(zkServers, sessionTimeout, connectionTimeout);
log.info("zk client 连接成功:{}",zkServers);
}
@Override
protected void waitLock() {
CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("监听到节点被删除");
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {}
};
//完成 watcher 注册
client.subscribeDataChanges(lockPath, listener);
//阻塞自己
if (client.exists(lockPath)) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取消watcher注册
client.unsubscribeDataChanges(lockPath, listener);
}
@Override
protected boolean tryLock() {
try {
client.createEphemeral(lockPath);
System.out.println(Thread.currentThread().getName()+"获取到锁");
} catch (Exception e) {
log.error("创建失败");
return false;
}
return true;
}
@Override
public void releaseLock() {
client.delete(this.lockPath);
}
}
缺点:
每次去竞争锁,都只会有一个线程拿到锁,当线程数庞大时会发生“惊群”现象,zookeeper节点可能会运行缓慢甚至宕机。这是因为其他线程没获取到锁时都会监听/lockPath节点,当A线程释放完毕,海量的线程都同时停止阻塞,去争抢锁,这种操作十分耗费资源,且性能大打折扣。
临时顺序节点与临时节点不同的是产生的节点是有序的,我们可以利用这一特点,只让当前线程监听上一序号的线程,每次获取锁的时候判断自己的序号是否为最小,最小即获取到锁,执行完毕就删除当前节点继续判断谁为最小序号的节点。
临时顺序节点操作源码
@Slf4j
public class ZkSequenTemplateLock extends AbstractTemplateLock {
private static final String zkServers = "127.0.0.1:2181";
private static final int sessionTimeout = 8000;
private static final int connectionTimeout = 5000;
private static final String lockPath = "/lockPath";
private String beforePath;
private String currentPath;
private ZkClient client;
public ZkSequenTemplateLock() {
client = new ZkClient(zkServers);
if (!client.exists(lockPath)) {
client.createPersistent(lockPath);
}
log.info("zk client 连接成功:{}",zkServers);
}
@Override
protected void waitLock() {
CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("监听到节点被删除");
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {}
};
//给排在前面的节点增加数据删除的watcher,本质是启动另一个线程去监听上一个节点
client.subscribeDataChanges(beforePath, listener);
//阻塞自己
if (client.exists(beforePath)) {
try {
System.out.println("阻塞"+currentPath);
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取消watcher注册
client.unsubscribeDataChanges(beforePath, listener);
}
@Override
protected boolean tryLock() {
if (currentPath == null) {
//创建一个临时顺序节点
currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data");
System.out.println("current:" + currentPath);
}
//获得所有的子节点并排序。临时节点名称为自增长的字符串
List<String> childrens = client.getChildren(lockPath);
//排序list,按自然顺序排序
Collections.sort(childrens);
if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
return true;
} else {
//如果当前节点不是排第一,则获取前面一个节点信息,赋值给beforePath
int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
beforePath = lockPath + "/" + childrens.get(curIndex - 1);
}
System.out.println("beforePath"+beforePath);
return false;
}
@Override
public void releaseLock() {
System.out.println("delete:" + currentPath);
client.delete(currentPath);
}
}
curator提供了以下种类的锁:
我们采用第一种Shared Reentrant Lock中的InterProcessMutex
来完成上锁、释放锁的的操作
public class ZkLockWithCuratorTemplate implements Lock {
// zk host地址
private String host = "localhost";
// zk自增存储node
private String lockPath = "/curatorLock";
// 重试休眠时间
private static final int SLEEP_TIME_MS = 1000;
// 最大重试1000次
private static final int MAX_RETRIES = 1000;
//会话超时时间
private static final int SESSION_TIMEOUT = 30 * 1000;
//连接超时时间
private static final int CONNECTION_TIMEOUT = 3 * 1000;
//curator核心操作类
private CuratorFramework curatorFramework;
InterProcessMutex lock;
public ZkLockWithCuratorTemplate() {
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(host)
.connectionTimeoutMs(CONNECTION_TIMEOUT)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))
.build();
curatorFramework.start();
lock = new InterProcessMutex (curatorFramework, lockPath);
}
@Override
public void getLock() throws Exception {
//5s后超时释放锁
lock.acquire(5, TimeUnit.SECONDS);
}
@Override
public void unlock() throws Exception {
lock.release();
}
}
源码以及测试类地址
https://github.com/Motianshi/distribute-tool
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。