前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于Zookeeper实现分布式锁,如何解决羊群问题?

基于Zookeeper实现分布式锁,如何解决羊群问题?

作者头像
架构探险之道
发布2023-03-04 11:01:13
4300
发布2023-03-04 11:01:13
举报
文章被收录于专栏:架构探险之道
基于Zookeeper实现分布式锁,如何解决羊群问题?

目录:

理解分布式锁

我们先来看一个问题,如图所示,两个用户同时去抢购秒杀商品,当秒杀服务同时收到秒杀请求 时,都去进行库存扣减,此时在没有做任何处理的情况下,就会导致库存数量变成负数从而导致超卖现 象。

这种情况下我们一般会选择加锁的方式来避免并发的问题。但是在分布式场景中,采用传统的锁并不能

解决跨进程并发的问题,所以需要引入一个分布式锁,来解决多个节点之间的访问控制。

Zookeeper如何解决分布式锁

我们可以基于Zookeeper的两种特性来实现分布式锁,先来说第一种。

使用唯一节点特性实现分布式锁

分布式锁:创建节点

就是基于唯一节点特性,如图所示。多个应用程序去抢占锁资源时,只需要在指定节点上创建一 个 /Lock 节点,由于Zookeeper中节点的唯一性特性,使得只会有一个用户成功创建 /Lock 节点,剩 下没有创建成功的用户表示竞争锁失败。

这种方法能达到目的,但是会有一个问题,如图所示,假设有非常多的节点需要等待获得锁,那么等待的方式自然是使用 Watcher 机制来监听 /lock 节点的删除事件, 一旦发现该节点被删除说明之前获得锁的节点已经释放了锁,此时剩下的 B、C、D。。节点同时会收到删除事件从而去竞争锁,这个过程会产生惊群效应。

问题:

  • 大量线程竞争锁,获取不到锁时,需要监听该节点的删除事件
  • 删除后,会触发惊群效应
  • 多个并发线程监听,对于 Zookeeper 也是种负担,对于分布式协调一致性的中间件,性能的考量也是十分重要的。

但是会产生“惊群效应”,简单来说就是如果存在许多的客户端在等待获取锁,当成功获取到锁的进程释放该节点后,所有处于等待状态的客户端都会被唤醒,这个时候 zookeeper 在短时间内发送大量子节点变更事件给所有待获取锁的客户端,然后实际情况是只会有一个客户端获得锁。如果在集群规模比较大 的情况下,会对 zookeeper 服务器的性能产生比较的影响。

分布式锁

使用有序节点实现分布式锁

因此为了解决这个问题,我们可以采用 Zookeeper 的有序节点特性来实现分布式锁。

如图所示,每个客户端都往指定的节点下注册一个临时有序节点,越早创建的节点,节点的顺序编号就越小,那么我们可以判断子节点中最小的节点设置为获得锁。如果自己的节点不是所有子节点中最小的,意味着还没有获得锁。这个的实现和前面单节点实现的差异性在于,每个节点只需要监听比自己小的节点,当比自己小的节点删除以后,客户端会收到 watcher 事件,此时再次判断自己的节点是不是 所有子节点中最小的,如果是则获得锁,否则就不断重复这个过程,这样就不会导致羊群效应,因为每个客户端只需要监控一个节点。

使用临时有序节点

如图所示,表示有序节点实现分布式锁的流程。

Curator 实现分布式锁源码解读

在本节中我们使用Curator来实现分布式锁。为了实现分布式锁,我们先演示一个存在并发异常的场

景。

代码语言:javascript
复制
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>5.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>5.2.0</version>
    </dependency>

Spring Boot 工程模拟订单多线程并发减库存操作,通过 Jmeter 模拟。

如果上述代码使用jmeter进行压测,用1500个线程,库存数量设置成100,监视数据库中库存的变化发现, 整个库存变化过程是非常混乱的。

代码语言:javascript
复制
@Configuration
public class CuratorConfig {

    @Bean
    public CuratorFramework curatorFramework(){
        CuratorFramework curatorFramework=CuratorFrameworkFactory
                .builder()
                .connectString("loclahost:2181")
                .sessionTimeoutMs(15000)
                .connectionTimeoutMs(20000)
                .retryPolicy(new ExponentialBackoffRetry(1000,10))
                .build();
        curatorFramework.start();
        return curatorFramework;
    }
}


@Scope(scopeName = "prototype")
@RestController
@RequestMapping("/goods-stock")
public class GoodsStockController {


    @Autowired
    IGoodsStockService goodsStockService;

    @Autowired
    CuratorFramework curatorFramework;

    @GetMapping("{goodsNo}")
    public String  purchase(@PathVariable("goodsNo")Integer goodsNo) throws Exception {
        QueryWrapper<GoodsStock> queryWrapper=new QueryWrapper<>();
        queryWrapper.eq("goods_no",goodsNo);
        //基于临时有序节点来实现的分布式锁.
        InterProcessMutex lock=new InterProcessMutex(curatorFramework,"/Locks");
        try {
            lock.acquire(); //抢占分布式锁资源(阻塞的)
            GoodsStock goodsStock = goodsStockService.getOne(queryWrapper);
            Thread.sleep(new Random().nextInt(1000));
            if (goodsStock == null) {
                return "指定商品不存在";
            }
            if (goodsStock.getStock().intValue() < 1) {
                return "库存不够";
            }
            goodsStock.setStock(goodsStock.getStock() - 1);
            boolean res = goodsStockService.updateById(goodsStock);
            if (res) {
                return "抢购书籍:" + goodsNo + "成功";
            }
        }finally {
            lock.release(); //释放锁
        }
        return "抢购失败";
    }
}

构造方法

代码语言:javascript
复制
    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
    {
        basePath = PathUtils.validatePath(path); // Locks 目录
        internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }

acquire

代码语言:javascript
复制
    @Override
    public void acquire() throws Exception
    {
        if ( !internalLock(-1, null) )
        {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }

internalLock

代码语言:javascript
复制
 private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread currentThread = Thread.currentThread();
		// 判断当前线程是否获得过锁 (重入特性设计,避免死锁)
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null ) // 说明当前线程获取到了锁
        {
        	// 同一线程再次acquire,首先判断当前的映射表内(threadData)是否有该线程的锁信息,如 果有则原子+1,然后返回
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

		// 1. Zookeeper server 创建临时有序节点
        // 2. 判断创建的节点的序号是否是所有节点中最小的,如果是,则获得锁成功并返回
        // 3. 否则,等待
        // 映射表内没有对应的锁信息,尝试通过LockInternals获取锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
        	// 成功获取锁,记录信息到映射表
        	// 构建一个锁数据
            LockData newLockData = new LockData(currentThread, lockPath);
            // 存到当前线程级缓存中
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }
代码语言:javascript
复制
// 并发安全的全局 MAP 
// 映射表
// 记录线程与锁信息的映射关系
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

// 锁信息
// Zookeeper中一个临时顺序节点对应一个“锁”,但让锁生效激活需要排队(公平锁),下面会继续分析 
private static class LockData{
    
	final Thread owningThread;
    
 	final String lockPath;
    
	final AtomicInteger lockCount = new AtomicInteger(1); // 分布式锁重入次数
    
    private LockData(Thread owningThread, String lockPath){
        this.owningThread = owningThread;
        this.lockPath = lockPath;
} }

attemptLock

尝试获得锁,实际上是向zookeeper注册一个临时有序节点,并且判断当前创建的节点的顺序是否是最小节点。如果是则表示获得锁成功

代码语言:javascript
复制
 // 尝试获取锁,并返回锁对应的Zookeeper临时顺序节点的路径 
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        // 无限等待时,millisToWait为null
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        // 创建ZNode节点时的数据内容,无关紧要,这里为null
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        
       int             retryCount = 0;// 当前已经重试次数,与CuratorFramework的重试策略有关
		
        // 在Zookeeper中创建的临时顺序节点的路径,相当于一把待激活的分布式锁 
        // 激活条件:同级目录子节点,名称排序最小(排队,公平锁),后续继续分析
        String          ourPath = null;
        boolean         hasTheLock = false;
        // 是否已经完成尝试获取分布式锁的操作
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
				// 从InterProcessMutex的构造函数可知实际driver为 StandardLockInternalsDriver的实例
				// 在Zookeeper中创建临时顺序节点
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                // 循环等待来激活分布式锁,实现锁的公平性,后续继续分析
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
            	// 容错处理,不影响主逻辑的理解,可跳过
				// 因为会话过期等原因,StandardLockInternalsDriver因为无法找到创建的临时顺序 节点而抛出NoNodeException异常
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                	// 不满足重试策略则继续抛出NoNodeException
                    throw e;
                }
            }
        }
        
		// 成功获得分布式锁,返回临时顺序节点的路径,上层将其封装成锁信息记录在映射表,方便锁重入
        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }

createsTheLock

在 Zookeeper 中创建临时顺序节点,并返回节点名称

代码语言:javascript
复制
@Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        // lockNodeBytes不为null则作为数据节点内容,否则采用默认内容(IP地址)
        if ( lockNodeBytes != null )
        {
        	// creatingParentContainersIfNeeded:用于创建容器节点 
        	// withProtection:临时子节点会添加GUID前缀
            ourPath = client.create()
            .creatingParentContainersIfNeeded()
            .withProtection()
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
            .forPath(path, lockNodeBytes);
        }
        else
        {
        	// CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点,Zookeeper能保证在节点产生的顺序性
			// 依据顺序来激活分布式锁,从而也实现了分布式锁的公平性,后续继续分析
            ourPath = client.create()
            .creatingParentContainersIfNeeded()
            .withProtection()
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

internalLockLoop

代码语言:javascript
复制
 // 循环等待来激活分布式锁,实现锁的公平性
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;// 是否已经持有分布式锁
        boolean     doDelete = false;// 是否需要删除子节点
        try
        {
            if ( revocable.get() != null ) // 锁撤销,避免死锁
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
			//在没有获得锁,并且链接是启动状态的情况下持续循环
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();// 获取排序后的子节点列表
                // 获取前面自己创建的临时顺序子节点的名称(序列号)
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
				// 实现锁的公平性的核心逻辑,看下面的分析
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
				// 获得了锁((比较序列号是否是最小的)),中断循环,继续返回上
				if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                	// 没有获得到锁,监听上一临时顺序节点
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try
                        {
                        	// exists()会导致导致资源泄漏,因此exists()可以监听不存在的ZNode,因此采用getData()
                            // 上一临时顺序节点如果被删除,会唤醒当前线程继续竞争锁,正常情况下能直接获得锁,因为锁是公平的 
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            //是否有超时机制
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                	// 获取锁超时,标记删除之前创建的临时顺序节点
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);//限时等待被唤醒
                            }
                            else
                            {
                                wait();//不限时等待
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                       	// 容错处理,逻辑稍微有点绕,可跳过,不影响主逻辑的理解
						// client.getData()可能调用时抛出NoNodeException,原因可能是锁被释放或会话过期(连接丢失)等
						// 这里并没有做任何处理,因为外层是while循环,再次执行 driver.getsTheLock时会调用validateOurIndex
						// 此时会抛出NoNodeException,从而进入下面的catch和finally逻辑,重 新抛出上层尝试重试获取锁并删除临时顺序节
                       }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;// 标记删除,在finally删除之前创建的临时顺序节点(后台不断尝试)
            throw e;// 重新抛出,尝试重新获取锁
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);//删除当前节点
            }
        }
        return haveTheLock;
    }

driver.getsTheLock

StandardLockInternalsDriver

代码语言:javascript
复制
 @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {
    	// 之前创建的临时顺序节点在排序后的子节点列表中的索引
        int             ourIndex = children.indexOf(sequenceNodeName);
        // 校验之前创建的临时顺序节点是否有效
        validateOurIndex(sequenceNodeName, ourIndex);
// 锁公平性的核心逻辑
// 由InterProcessMutex的构造函数可知,maxLeases为1,即只有ourIndex为0时,线程才能 持有锁,或者说该线程创建的临时顺序节点激活了锁
// Zookeeper的临时顺序节点特性能保证跨多个JVM的线程并发创建节点时的顺序性,越早创建临时 顺序节点成功的线程会更早地激活锁或获得锁
        boolean         getsTheLock = ourIndex < maxLeases;
        // 如果已经获得了锁,则无需监听任何节点,否则需要监听上一顺序节点(ourIndex-1)
// 因为锁是公平的,因此无需监听除了(ourIndex-1)以外的所有节点,这是为了减少羊群效应,非常巧妙的设计!!
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
// 返回获取锁的结果,交由上层继续处理(添加监听等操作)
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    
    static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException
    {
        if ( ourIndex < 0 )
        {
        // 容错处理,可跳过
// 由于会话过期或连接丢失等原因,该线程创建的临时顺序节点被Zookeeper服务端删除,往外抛出NoNodeException
// 如果在重试策略允许范围内,则进行重新尝试获取锁,这会重新重新生成临时顺序节点
// 佩服Curator的作者将边界条件考虑得如此周到!
            throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);
        }
    }

释放锁的逻辑

release

代码语言:javascript
复制
    @Override
    public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        // 无法从映射表中获取锁信息,表示当前没有持有锁
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
		// 锁是可重入的,初始值为1,原子-1到0,锁才释放
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {
       	 // lockData != null && newLockCount == 0,释放锁资源
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
        // 最后从映射表中移除当前线程的锁信息
            threadData.remove(currentThread);
        }
    }
代码语言:javascript
复制
    final void releaseLock(String lockPath) throws Exception
    {
        client.removeWatchers();//移除订阅事件
        revocable.set(null);// 删除临时顺序节点,只会触发后一顺序节点去获取锁,理论上不存在竞争,只排队,非抢占,公平
锁,先到先得
        deleteOurPath(lockPath);
    }

    private void deleteOurPath(String ourPath) throws Exception
    {
        try
        {
// 后台不断尝试删除
            client.delete().guaranteed().forPath(ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // ignore - already deleted (possibly expired session, etc.)
        }
    }

锁撤销

InterProcessMutex支持一种协商撤销互斥锁的机制, 可以用于死锁的情况 想要撤销一个互斥锁可以调用下面这个方法:

代码语言:javascript
复制
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
//...

    @Override
    public void makeRevocable(RevocationListener<InterProcessMutex> listener)
    {
        makeRevocable(listener, MoreExecutors.directExecutor());
    }

    @Override
    public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
    {
        internals.makeRevocable(new RevocationSpec(executor, new Runnable()
            {
                @Override
                public void run()
                {
                    listener.revocationRequested(InterProcessMutex.this);
                }
            }));
    }

这个方法可以让锁持有者来处理撤销动作。当其他进程/线程想要你释放锁时,就会回调参数中的监听

器方法。但是,此方法不是强制撤销的,是一种协商机制。当想要去撤销/释放一个锁时,可以通过 Revoker 中的静态方法来发出请求,

Revoker.attemptRevoke();

代码语言:javascript
复制
 public static void attemptRevoke(CuratorFramework client,String path) throws
Exception
  • path: 加锁的 zk 节点 path,通常可以通过 InterProcessMutex.getParticipantNodes() 获得这个方法会发出撤销某个锁的请求。如果锁的持有者注册了上述的 RevocationListener 监听器,那么就会调用监听器方法协商撤销锁。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构探险之道 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 理解分布式锁
  • Zookeeper如何解决分布式锁
    • 使用唯一节点特性实现分布式锁
    • 分布式锁
      • 使用有序节点实现分布式锁
      • Curator 实现分布式锁源码解读
        • 构造方法
          • acquire
            • internalLock
              • attemptLock
                • createsTheLock
                  • internalLockLoop
                    • driver.getsTheLock
                      • 释放锁的逻辑
                        • 锁撤销
                        相关产品与服务
                        消息队列
                        腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档