条件变量将因不同条件而无法推进的线程分别阻塞在不同的条件队列上,可以精细控制线程同步,降低惊群效应。
//ExclusiveNode、SharedNode、ConditionNode都继承此类
abstract static class Node {
volatile Node prev; // initially attached via casTail
volatile Node next; // visibly nonnull when signallable
Thread waiter; // visibly nonnull when enqueued
volatile int status; // written by owner, atomic bit ops by others
}
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // link to next waiting node
}
//条件队列
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
private transient ConditionNode lastWaiter;
}
线程执行await后就会进入条件队列,等被唤醒时重新进入同步队列。
signal会唤醒条件队列上的首个线程,而signalAll会唤醒全部线程,唤醒流程如下:
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}
public final void signalAll() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, true);
}
//AQS
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {//取消COND状态
enqueue(first);//转入同步队列
if (!all)
break;
}
first = next;
}
}
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
LockSupport.unpark(node.waiter);
break;
}
}
}
}
await流程如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);//加入条件队列
LockSupport.setCurrentBlocker(this); // for back-compatibility,将AQS对象设置到thread中
boolean interrupted = false, cancelled = false, rejected = false;
while (!canReacquire(node)) {//如果被唤醒进入同步队列后就可以跳出循环
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
if (rejected)
node.block();
else
ForkJoinPool.managedBlock(node);//阻塞线程,最终会调用LockSupport.park()
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
//被唤醒
LockSupport.setCurrentBlocker(null);
node.clearStatus();//
//lock.lock()方法:acquire(null, arg, false, false, false, 0L);
//重新获取锁时已原来的savedState
acquire(node, savedState, false, false, false, 0L);//重新获取锁,此时该节点已经进入了同步队列,有可能直接tryAcquire成功跳出循环,也可能需要两次循环修改node.status为WAITING、park。
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
enableWait方法需要保存线程此时现场状态用于将来恢复,加入条件队列并释放锁。
/**
* Adds node to condition list and releases lock.
*
* @param node the node
* @return savedState to reacquire after wait
*/
private int enableWait(ConditionNode node) {
if (isHeldExclusively()) {//Sync 判断是否是持有锁的线程
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING);//设置标志
//加入条件队列
ConditionNode last = lastWaiter;
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
//缓存状态用于恢复
int savedState = getState();
if (release(savedState))//AQS.release释放锁
return savedState;
}
node.status = CANCELLED; // lock not held or inconsistent
throw new IllegalMonitorStateException();
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。