作者:王二蛋
文章:https://cloud.tencent.com/developer/article/2465456
这篇文章详细介绍了作者花了一个月时间学习大数据技术的心得体会,涵盖了大数据技术的发展历程、主要技术(如Hadoop、Spark、Flink、Hive、HBase、Yarn)的背景、解决问题及其优缺点。作者强调了理解技术思想的重要性,并指出大数据的分布式思想已广泛应用于各领域。
CountDownLatch
是 Java 并发包java.util.concurrent
中的一个同步工具类,它允许一个或多个线程等待其他线程完成操作。CountDownLatch
的工作原理是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了任务,计数器的值就会减一。当计数器的值变为 0 时,表示所有线程均已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。
/**
* CountDownLatch
* 倒计时,拦截某个线程(CountDownLatch.await()),当倒计时为0时,将线程放行
*/
@Slf4j
public class TestCountDownLatch {
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
private static final ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtils.newThreadPool(
1,
1,
0,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1), null, null
);
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
try {
log.info("thread finish execute... count = {}", countDownLatch.getCount());
// 阻塞
countDownLatch.await();
log.info("thread finish execute... count = {}", countDownLatch.getCount());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
threadPoolExecutor.execute(
new Runnable() {
@Override
public void run() {
log.info("threadPoolExecutor execute task... count = {}", countDownLatch.getCount());
// 减一
countDownLatch.countDown();
log.info("threadPoolExecutor execute task... count = {}", countDownLatch.getCount());
}
}
);
Thread.sleep(1000);
threadPoolExecutor.shutdownNow();
}
}
public class CountDownLatch {
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
Sync 是一个静态内部类,继承自 AbstractQueuedSynchronizer,即AQS,主要功能如下:
也就是说,CountDownLatch是通过 Sync对象 实现锁资源共享和释放。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
以上是重载的两个方法,一个没有传入,一个有传入数据;一个无返回值,一个有返回值;
【1】await()
这是 CountDownLatch
的基本等待方法,它会让当前线程无限期地等待,直到计数器达到零。如果计数器在某个时刻达到零,那么 await()
方法会立即返回,并且线程可以继续执行。
【2】await(long timeout, TimeUnit unit)
这个方法与基本的 await()
方法类似,但它增加了一个超时参数。
这意味着,如果计数器在指定的超时时间内没有达到零,那么 await(long timeout, TimeUnit unit)
方法会返回 false
,并且线程可以继续执行,即使计数器还没有达到零。
上面两者区别主要是超时机制的处理。
await()
方法会让线程无限期地等待,直到计数器达到零;
await(long timeout, TimeUnit unit)
方法则允许线程在等待一段时间后继续执行,即使计数器还没有达到零。
这个区别,就是 await(long timeout, TimeUnit unit) 多了一个超时判断步骤,所在源码如下:
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//【1】回调了内部Sync的tryAcquireShared方法
return tryAcquireShared(arg) >= 0 ||
//【2】如果前面判断为false,那么后面这里接着执行超时等待机制
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//【3】设置截止时间:计算截止时间deadline
final long deadline = System.nanoTime() + nanosTimeout;
//【4】将当前线程加入等待队列
final Node node = addWaiter(Node.SHARED);
try {
// 进入一个死循环
for (;;) {
final Node p = node.predecessor();
//【5】检查前驱结点是否是头结点
if (p == head) {
//【6】如果成功获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 则更新头节点并传播状态,返回true
setHeadAndPropagate(node, r);
p.next = null; // help GC
return true;
}
}
// 【7】如果未成功获取锁,检查剩余时间是否已过期
nanosTimeout = deadline - System.nanoTime();
// 【8】如果过期则取消等待并返回false
if (nanosTimeout <= 0L) {
//【9】取消申请
cancelAcquire(node);
return false;
}
// 【10】如果未过期且满足条件,则挂起当前线程一段时间(1000L纳秒)
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// 【11】如果线程被中断,则抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
//【12】异常处理:捕获并处理可能抛出的异常,取消等待并重新抛出异常。取消申请
cancelAcquire(node);
throw t;
}
}
代码分析:
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 【1】回调CountDownLatch#Sync的tryReleaseShared方法,返回倒计时是否等于0的判断
if (tryReleaseShared(arg)) {
// 【2】如果倒计时到0了,则进行释放倒计时
doReleaseShared();
return true;
}
return false;
}
代码分析:
【1】回调CountDownLatch#Sync的tryReleaseShared方法,返回倒计时是否等于0的判断
【2】如果倒计时到0了,则进行释放倒计时
源码分析:
【1】会回调CountDownLatch#Sync的tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
// 倒计时是否到0了
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
【2】 doReleaseShared();
private void doReleaseShared() {
//【1】在一个无限循环中,检查当前头节点是否不为空且不是尾节点。
for (;;) {
Node h = head;
//【2】检查头节点状态
if (h != null && h != tail) {
int ws = h.waitStatus;
//【3】如果头节点的状态为 SIGNAL,尝试将其状态设置为 0 并唤醒其后继节点。
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//【4】如果头节点的状态为 0,尝试将其状态设置为 PROPAGATE 以确保释放操作继续传播
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
await()
时要注意处理InterruptedException。CountDownLatch 与其他同步工具的区别
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。