首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >JDK源码解读:CountDownLatch源码解析

JDK源码解读:CountDownLatch源码解析

原创
作者头像
后台技术汇
修改2024-11-20 11:07:32
修改2024-11-20 11:07:32
3590
举报

“好事”文章分享

作者:王二蛋

文章:https://cloud.tencent.com/developer/article/2465456

这篇文章详细介绍了作者花了一个月时间学习大数据技术的心得体会,涵盖了大数据技术的发展历程、主要技术(如Hadoop、Spark、Flink、Hive、HBase、Yarn)的背景、解决问题及其优缺点。作者强调了理解技术思想的重要性,并指出大数据的分布式思想已广泛应用于各领域。

背景

CountDownLatch是 Java 并发包java.util.concurrent中的一个同步工具类,它允许一个或多个线程等待其他线程完成操作。CountDownLatch的工作原理是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了任务,计数器的值就会减一。当计数器的值变为 0 时,表示所有线程均已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

经典案例

代码语言:txt
复制

/**
 * CountDownLatch
 * 倒计时,拦截某个线程(CountDownLatch.await()),当倒计时为0时,将线程放行
 */
@Slf4j
public class TestCountDownLatch {

    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtils.newThreadPool(
            1,
            1,
            0,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1), null, null
    );

    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {
            try {
                log.info("thread finish execute... count = {}", countDownLatch.getCount());
                // 阻塞
                countDownLatch.await();
                log.info("thread finish execute... count = {}", countDownLatch.getCount());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();

        threadPoolExecutor.execute(
                new Runnable() {
                    @Override
                    public void run() {
                        log.info("threadPoolExecutor execute task... count = {}", countDownLatch.getCount());
                        // 减一
                        countDownLatch.countDown();
                        log.info("threadPoolExecutor execute task... count = {}", countDownLatch.getCount());
                    }
                }
        );


        Thread.sleep(1000);
        threadPoolExecutor.shutdownNow();
    }
}

构造器

代码语言:txt
复制
public class CountDownLatch {
    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
}
  1. CountDownLatch内部封装了一个 Sync sync;
    1. sync 是一个私有且不可变的成员变量,类型为 Sync,是一个内部类,作用是用于管理计数器的状态。
  2. CountDownLatch(int count) 传入一个 初始化计数器的值。
    1. 检查传入的 count 是否小于0,如果是,则抛出 IllegalArgumentException 异常
    2. 否则,创建一个新的 Sync 对象,并将其赋值给 sync 成员变量。

Sync对象

代码语言:txt
复制
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

Sync 是一个静态内部类,继承自 AbstractQueuedSynchronizer,即AQS,主要功能如下:

  1. 构造函数 Sync(int count):初始化同步器的状态为给定的计数值。
  2. 方法 getCount():返回当前的计数值。
  3. 方法 tryAcquireShared(int acquires):尝试获取共享锁,如果当前计数值为0,则返回1表示成功;否则返回-1表示失败。
  4. 方法 tryReleaseShared(int releases):尝试释放共享锁,通过递减计数值并检查是否为0来决定是否通知等待线程。

也就是说,CountDownLatch是通过 Sync对象 实现锁资源共享和释放。

常用方法

await阻塞当前线程源码

代码语言:txt
复制
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

以上是重载的两个方法,一个没有传入,一个有传入数据;一个无返回值,一个有返回值;

【1】await()

这是 CountDownLatch 的基本等待方法,它会让当前线程无限期地等待,直到计数器达到零。如果计数器在某个时刻达到零,那么 await() 方法会立即返回,并且线程可以继续执行。

【2】await(long timeout, TimeUnit unit)

这个方法与基本的 await() 方法类似,但它增加了一个超时参数。

这意味着,如果计数器在指定的超时时间内没有达到零,那么 await(long timeout, TimeUnit unit) 方法会返回 false,并且线程可以继续执行,即使计数器还没有达到零。

上面两者区别主要是超时机制的处理。

await()方法会让线程无限期地等待,直到计数器达到零;

await(long timeout, TimeUnit unit)方法则允许线程在等待一段时间后继续执行,即使计数器还没有达到零。

这个区别,就是 await(long timeout, TimeUnit unit) 多了一个超时判断步骤,所在源码如下:

代码语言:txt
复制
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            //【1】回调了内部Sync的tryAcquireShared方法
        return tryAcquireShared(arg) >= 0 ||
                    //【2】如果前面判断为false,那么后面这里接着执行超时等待机制
                    doAcquireSharedNanos(arg, nanosTimeout);
    }
    
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //【3】设置截止时间:计算截止时间deadline
        final long deadline = System.nanoTime() + nanosTimeout;
        //【4】将当前线程加入等待队列
        final Node node = addWaiter(Node.SHARED);
        try {
            // 进入一个死循环
            for (;;) {
                final Node p = node.predecessor();
                //【5】检查前驱结点是否是头结点
                if (p == head) {
                    //【6】如果成功获取锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 则更新头节点并传播状态,返回true
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return true;
                    }
                }
                // 【7】如果未成功获取锁,检查剩余时间是否已过期
                nanosTimeout = deadline - System.nanoTime();
                // 【8】如果过期则取消等待并返回false
                if (nanosTimeout <= 0L) {
                    //【9】取消申请
                    cancelAcquire(node);
                    return false;
                }
                // 【10】如果未过期且满足条件,则挂起当前线程一段时间(1000L纳秒)
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 【11】如果线程被中断,则抛出InterruptedException
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            //【12】异常处理:捕获并处理可能抛出的异常,取消等待并重新抛出异常。取消申请
            cancelAcquire(node);
            throw t;
        }
    }    

代码分析:

  1. 【1】回调了内部Sync的tryAcquireShared方法
  2. 【2】如果前面判断为false,那么后面这里接着执行超时等待机制
    1. 【3】设置截止时间:计算截止时间deadline
    2. 【4】将当前线程加入等待队列/
    3. 进入一个for死循环,除非正常或者异常退出
    4. 【5】检查前驱结点是否是头结点
    5. 【6】如果成功获取锁,则更新头节点并传播状态,返回true
    6. 【7】如果未成功获取锁,检查剩余时间是否已过期
    7. 【8】如果过期则取消等待并返回false
      1. 【9】取消申请
    8. 【10】如果未过期且满足条件,则挂起当前线程一段时间(1000L纳秒)
    9. 【11】如果线程被中断,则抛出InterruptedException
    10. 【12】异常处理:捕获并处理可能抛出的异常,取消等待并重新抛出异常。取消申请

countDown:倒计时源码

代码语言:txt
复制
    public void countDown() {
        sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
        // 【1】回调CountDownLatch#Sync的tryReleaseShared方法,返回倒计时是否等于0的判断
        if (tryReleaseShared(arg)) {
            // 【2】如果倒计时到0了,则进行释放倒计时
            doReleaseShared();
            return true;
        }
        return false;
    }

代码分析:

【1】回调CountDownLatch#Sync的tryReleaseShared方法,返回倒计时是否等于0的判断

【2】如果倒计时到0了,则进行释放倒计时

源码分析:

【1】会回调CountDownLatch#Sync的tryReleaseShared方法

代码语言:txt
复制
        protected boolean tryReleaseShared(int releases) {
            // 倒计时是否到0了
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

【2】 doReleaseShared();

代码语言:txt
复制
    private void doReleaseShared() {
    //【1】在一个无限循环中,检查当前头节点是否不为空且不是尾节点。
        for (;;) {
            Node h = head;
            //【2】检查头节点状态
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //【3】如果头节点的状态为 SIGNAL,尝试将其状态设置为 0 并唤醒其后继节点。
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                //【4】如果头节点的状态为 0,尝试将其状态设置为 PROPAGATE 以确保释放操作继续传播
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

总结

CountDownLatch 使用注意事项

  1. 不可重用:CountDownLatch是一次性的,计数器归零后无法再次使用。
  2. 异常处理:在使用await()时要注意处理InterruptedException。
  3. 公平性问题:默认情况下,等待的线程可能不会按照到达的先后顺序被唤醒,若需严格保证顺序,可以考虑使用其他同步工具如CyclicBarrier。

CountDownLatch 与其他同步工具的区别

  • 与Semaphore的区别
    • Semaphore控制的是同时访问某一资源的线程数量。
    • CountDownLatch关注的是一个或多个线程等待其他线程完成操作。
  • 与CyclicBarrier的区别
    • CyclicBarrier允许一组线程互相等待到达某个点后再继续执行,并且可以重复使用。
    • CountDownLatch只能单向等待,且不可复用。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • “好事”文章分享
  • 背景
    • 经典案例
    • 构造器
      • Sync对象
    • 常用方法
      • await阻塞当前线程源码
      • 【1】await()
      • 【2】await(long timeout, TimeUnit unit)
      • countDown:倒计时源码
  • 总结
    • CountDownLatch 使用注意事项
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档