1
前言
关于JUC包下的工具类,到目前为止已经分享了ReentranLock、Semaphore这两个工具类,同样很多前置内容在前面两遍博文中也都要讲到,那么今天所分享的是CountDownLatch工具类、通过前面博文我们知道ReentranLock是独占锁模式、Semaphore是共享锁模式、那么CountDownLatch是什么模式呢?CountDownLatch它是闭锁模式。
什么是闭锁?
闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行
例如:
1.确保某个服务在其依赖的其他服务都启动之后才启动
2.等待某个操作的所有参与者都准备就绪才继续执行
2
什么是CountDownLatch
CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
如上这段话可能不是那么好理解,我举一个生活中很通俗的例子:
一家人要外出自驾游,旅游的家庭成员总共有5位,只有当5位家庭成员全部上车了,爸爸才会开始开车出发对吗?假如爸爸现在通知家庭成员要出发了,其他的3位成员陆陆续续的上车,此时车上已有4位成员了,
还有一位小妹妹在上厕所,这个时候爸爸以及其他3位成员需要等待妹妹上车后,才会开车出发旅游。
3
CountDownLatch的使用场景
在我们平常开发过程中,需要对某些接口进行高平发测试,一般我们会想到通过jmeter性能工具进行压测,但是我们有没有java工具类去帮我实现这种并发测试的API工具呢?当然是有的,就是CountDownLacth工具类,我们可以在代码中模拟高并发测试接口的场景
以及上述所说的自驾游场景,当然大家也可以自己发挥想象,项目中是否有场景能够用到该工具类。
4
CountDownLatch源码详解
如何使用CountDownLatch
聊完CountDownLatch的使用场景后,我们来看看基于上面的场景通过CountDownLatch来实现相应的功能
/**
* @author sunny
* @date 2021/07/06 09:30
* @description
*/
@Slf4j
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
test01();
// test02();
// test03();
}
/**
* 模拟高并发场景
*
* @throws InterruptedException
*/
public static void test01() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
log.info("{}:线程已就绪,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());
countDownLatch.await();
log.info("{}:线程已释放,,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
TimeUnit.SECONDS.sleep(3);
System.out.println("\n ========================= \n");
countDownLatch.countDown();
}
/**
* 模拟家庭旅游场景
*/
public static void test02() {
CountDownLatch countDownLatch = new CountDownLatch(11);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
log.info("{}:已经上车了", Thread.currentThread().getName());
countDownLatch.countDown();
}).start();
}
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
log.info("{}:五秒后已经上车了", Thread.currentThread().getName());
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("\n ========================= \n");
countDownLatch.await();
log.info("开车旅游啦");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 模拟去峨眉山游玩,但是因为中途有道路需要修路,所以需要等待修完后才能出发
* 那么修路的过程中
*/
public static void test03() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 1; i <= 9; i++) {
new Thread(() -> {
try {
log.info("{}:线程,我要准备前往峨眉山,路被堵住了", Thread.currentThread().getName());
countDownLatch.await();
log.info("{}:线程,道路终于可以通行了", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
TimeUnit.SECONDS.sleep(3);
log.info("======================准备开始动工======================");
for (int i = 1; i <= 9; i++) {
Thread.sleep(300);
int fi = i;
new Thread(() -> {
countDownLatch.countDown();
log.info("{}:线程,已开工:{}天,剩余:{}天", Thread.currentThread().getName(), fi, countDownLatch.getCount());
}).start();
}
TimeUnit.SECONDS.sleep(6);
new Thread(() -> {
log.info("{}:线程,已开工", Thread.currentThread().getName());
countDownLatch.countDown();
}).start();
TimeUnit.SECONDS.sleep(8);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
countDownLatch.await();
log.info("{}:线程,去峨眉山游玩啦", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
源码分析
CountDownLatch有一个构造方法,传入的参数给state进行初始化,在CountDownLatch中state,我们就可以理解闭锁值,例如上面所说的家庭自驾游案例,当家庭成员全部到位,爸爸才会开车出发旅游对么?
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count); // 更新 state 值
}
首先从 "countDownLatch.await()" 作为源码入口
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
在该方法内有两个if判断,首先判断当前线程是否被中断,如果被中断了则直接抛出异常,而tryAcquireShared()方法则是通用模板方法,不同的子类根据自己的特性实现具体的逻辑
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
具体的加锁逻辑由子类自身的特性去具体实现的,在CountDownLatch中,它的加锁钩子方法如下所示,如果不进行重写该方法,则强制抛出异常。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
接着我们走到CountDownLatch所实现逻辑代码块,该方法很简单,就是一个三元表达式,如果当前线程获取的state为0则代表无需进行等待,否则需要进行入队等待。就如刚刚前面所讲的自驾游场景案例,只有当所有成员全部上车了,才会开车出发,到这里还是很好理解的对吗?哈哈哈,我们接着往下看。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
然后我们回退到上一步,看到acquireSharedInterruptibly()方法,如果state大于0则返回-1,从而会进入到doAcquireSharedInterruptibly()方法,这个方法与Semaphore()逻辑几乎一样,只是我们需要理解概念所一样而已。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
然后我们进入到doAcquireSharedInterruptibly()方法,主要的逻辑都在自旋里面,但是外面同样也有个比较重要的方法,就是addWaiter()方法,该方法传入的参数值为 "Node.SHARED" ,而SHARED的值就是new Node() 也就是创建了一个空的节点,然后我们来看看addWaiter()方法其内部逻辑做了些什么事情?
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取令牌
if (r >= 0) { // 获取令牌成功
setHeadAndPropagate(node, r); //传播链表
p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收
failed = false;
return; // 获取令牌成功,退出自旋
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 阻塞当前线程
throw new InterruptedException();
}
} finally {
// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
if (failed)
cancelAcquire(node);
}
}
使文字更好理解代码这里先做前缀说明,node = 当前节点,tail = 链表末尾节点,head = 链表头节点
首先将当前线程封装为node节点,接着获取tail节点,判断当前AQS中是否存在双向链表,如果存在的话,将node前驱节点引用指向tail节点,通过cas将node节点设置为末尾节点,如果设置成功则将tail节点的后驱引用指向node,那么node就顺理成章的成了双向链表的末尾节点了。关于这里我们其实需要思考一个问题,在多线程情况下同时通过cas去设置尾节点,此时只会有一个线程设置成功且返回出去,那接下来的线程该怎么办呢?且不急,带着这个疑问我们进入到enq方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 封装节点
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 获取末尾节点
if (pred != null) {
node.prev = pred; // 当前节点的前驱引用指向为pred
if (compareAndSetTail(pred, node)) { // 将当前节点设置为链表末尾节点
pred.next = node; // 原末尾节点后驱引用指向为当前节点
return node;
}
}
enq(node);
return node;
}
基于FIFO入队流程图
通过如下图理解上面这段话,我相信应该是能够明白的
使文字更好理解代码这里先做前缀说明,node = 当前节点,tail = 链表末尾节点,head = 链表头节点
得勒,进来就是一层自旋,注意这里的精华就是自旋,以及上面所提到多线程通过cas设置尾节点失败的解决方案就在此方法。
进入自旋获取链表的末尾节点,如果获取tail为null则证明当前并没有构成双向链表,接着通过cas去设置head,然后将head指向tail,这样双向链表就完成了,如果获取tail不为null,将node前驱引用指向tail节点,然后tail的后驱节点引用指向node节点,然后返回出去。那如果设置失败了怎么办呢?回到上面的问题,问题不大,这方法不是自旋嘛,它会一直自旋到你设置成功为止,才退出自旋。
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 获取末尾节点
if (t == null) { // Must initialize // 构建双向链表
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果通过cas设置不成功,就一直进行自旋,直到设置成功才退出循环。
接着,回退到doAcquireSharedInterruptibly()方法,通过上面的流程下来,我们就知道node节点现在已经成功入队到双向链表中,接着判断如果当前节点的前驱节点是为头节点此时会尝试获取令牌,如果获取失败则将线程进行阻塞,同理当前节点的前驱节点不是链表的头节点,也会将当前线程进行阻塞。无论如何只要令牌没有了,就得老老实实的在队列中进行呆着,直到下一次的唤醒。
那如果线程为头节点且获取令牌成功了,setHeadAndPropagate()方法又会做些什么事情呢?带着这个疑问,我们进去一探究竟
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取令牌
if (r >= 0) { // 获取令牌成功
setHeadAndPropagate(node, r); //传播链表
p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收
failed = false;
return; // 获取令牌成功,退出自旋
}
}
if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞
parkAndCheckInterrupt()) // 阻塞当前线程
throw new InterruptedException();
}
} finally {
// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
if (failed)
cancelAcquire(node);
}
}
首先我们看到该方法的入参内容,node:当前获取令牌线程节点,propagate: 值是根据获取state是否等于0判断,如果等0这么为1否则为-1
该方法主要作用在于两点,第一点:将当前节点设置为头节点,第二点:自动唤醒下一个节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 || // 还有令牌可获取 || 头节点状态处于等待状态
(h = head) == null || h.waitStatus < 0) {
Node s = node.next; // 获取当前下一节点
if (s == null || s.isShared()) // 判断下节点是否为共享节点
doReleaseShared(); // 传播~~ 具体传播什么呢???
}
}
稍微可以看下设置头节点方法,也就是出队操作,主要就是将当前线程设置为头节点,然后将当前节点的前驱节点引用指向为null,配合方法外,会将之前的头节点的next节点设置为null,那么之前的头节点也就自然会被垃圾回收器进行
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
基于FIFO出队流程图
又一次来到自旋,首先验证链表中是否还存在多个节点,如果存在且状态为SIGNAL会将head的后驱节点进行唤醒。这里没啥太多好说的,就是一个传播概念,当你有多个节点在阻塞中,当state为0,是不是我的所有阻塞节点都需要被唤醒,然后执行后续的逻辑对么?
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) { // 自旋 可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。 如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】
Node h = head;
if (h != null && h != tail) { // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点
int ws = h.waitStatus; // 获取head的节点状态
if (ws == Node.SIGNAL) { // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒线程 去获取令牌
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; // 跳出当前循环
}
}
unparkSuccessor()方法很简单,在正常流程下它只会通过LockSupport.unpark(),将下一节点进行唤醒
private void unparkSuccessor(Node node) {
// 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现
int ws = node.waitStatus;
// 由于-1会小于0,所以更新改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取第一个正常排队的节点
Node s = node.next;
//正常解锁流程不会走该if判断
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
接下来看看countDown()方法到底也做了些什么流程操作
首先从 "countDownLatch.countDown()" 作为源码入口
sync.releaseShared(1);
我们来看到releaseShared方法,该方法内部有两个核心方法,我们先进入看看tryReleaseShared做了些什么事情
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //通用释放令牌
doReleaseShared(); //唤醒后驱节点
return true;
}
return false;
}
我们又看到了自旋,判断当前的state值是否等于0,等于0则代表需要提前的准备的线程都已就绪,主线程也可以执行剩下的业务逻辑啦,那如果不为0怎么办?一直自减,直到减到state为0,然后将链表内的线程全部进行唤醒。也就是会走到我上面所说到的doReleaseShared()方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
那么到这CountDownLatch源码分析到此结束了,相信大家伙如果看过我前面两篇文章,再看这篇博文会发现理解起来非常简单的。
JUC并发编程之CountDownLatch源码讲解视频
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章