AQS,全称:AbstractQueuedSynchronizer,是JDK提供的一个同步框架,内部维护着FIFO双向队列,即CLH同步队列。
AQS依赖它来完成同步状态的管理(voliate修饰的state,用于标志是否持有锁)。如果获取同步状态state失败时,会将当前线程及等待信息等构建成一个Node,将Node放到FIFO队列里,同时阻塞当前线程,当线程将同步状态state释放时,会把FIFO队列中的首节的唤醒,使其获取同步状态state。
很多JUC包下的锁都是基于AQS实现的
如下脑图:
static final class Node {
/** 共享节点 */
static final Node SHARED = new Node();
/** 独占节点 */
static final Node EXCLUSIVE = null;
/** 因为超时或者中断,节点会被设置成取消状态,被取消的节点不会参与到竞争中,会一直是取消
状态不会改变 */
static final int CANCELLED = 1;
/** 后继节点处于等待状态,如果当前节点释放了同步状态或者被取消,会通知后继节点,使其得以
运行 */
static final int SIGNAL = -1;
/** 节点在等待条件队列中,节点线程等待在condition上,当其他线程对condition调用了signal
后,该节点将会从等待队列中进入同步队列中,获取同步状态 */
static final int CONDITION = -2;
/**
* 下一次共享式同步状态获取会无条件的传播下去
*/
static final int PROPAGATE = -3;
/** 等待状态 */
volatile int waitStatus;
/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 获取同步状态的线程 */
volatile Thread thread;
/**
* 下一个条件队列等待节点
*/
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
/**
* 独占式获取同步状态
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
尝试去获取锁,获取成功返回true,否则返回false。该方法由继承AQS的子类自己实现。采用了模板方法设计模式。
如:ReentrantLock的Sync内部类,Sync的子类:NonfairSync和
FairSync
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
private Node addWaiter(Node mode) {
// 新建Node节点
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速添加尾结点
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS方式设置尾结点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果上面添加失败,这里循环尝试添加,直到添加成功为止
enq(node);
return node;
}
private Node enq(final Node node) {
// 一直for循环,直到插入Node成功为止
for (;;) {
Node t = tail;
if (t == null) {
// CAS设置首节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// CAS设置尾结点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
// 操作是否成功标志
boolean failed = true;
try {
// 线程中断标志
boolean interrupted = false;
// 不断的自旋循环
for (;;) {
// 当前节点的prev节点
final Node p = node.predecessor();
// 判断prev是否是头结点 && 是否获取到同步状态
if (p == head && tryAcquire(arg)) {
// 以上条件成立,将当前节点设置成头结点
setHead(node);
// 将prev节点移除队列中
p.next = null; // help GC
failed = false;
return interrupted;
}
// 自旋过程中,判断当前线程是否需要阻塞 && 阻塞当前线程并且检验线程中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 取消获取同步状态
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到当前节点的prev节点的等待状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 如果prev的status是signal,表示当prev释放了同步状态或者取消了,会通知当前节
* 点,所以当前节点可以安心的阻塞了(相当睡觉会有人叫醒他)
*/
return true;
if (ws > 0) {
/*
* status > 0,表示为取消状态,需要将取消状态的节点从队列中移除
* 直到找到一个状态不是取消的节点为止
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 除了以上情况,通过CAS将prev的status设置成signal
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this);
// 返回当前线程的中断状态
return Thread.interrupted();
}
static void selfInterrupt() {
// 未获取到同步状态 && 线程中断状态是true,中断当前线程
Thread.currentThread().interrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 如果头结点不为空 && 并且 头结点状态不为0
// 唤醒头结点的后继节点
unparkSuccessor(h);
return true;
}
return false;
}
尝试去释放同步状态,释放成功返回true,否则返回false。该方法由继承AQS的子类自己实现。采用了模板方法设计模式。
如:ReentrantLock的Sync内部类的tryRelease方法。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
private void unparkSuccessor(Node node) {
/*
* 获取当前节点状态
*/
int ws = node.waitStatus;
// 如果当前节点的状态小于0,那么用CAS设置成0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 获取当前节点的后继节点
*/
Node s = node.next;
// 如果后继节点为空 || 或者后继节点的状态 > 0 (为取消状态)
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾结点查找状态不为取消的可用节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点
LockSupport.unpark(s.thread);
}
在AQS中维护着一个FIFO的同步队列,当线程获取同步状态失败后,则会加入到这个CLH同步队列的对尾并一直保持着自旋。在CLH同步队列中的线程在自旋时会判断其前驱节点是否为首节点,如果为首节点则不断尝试获取同步状态,获取成功则退出CLH同步队列。当线程执行完逻辑后,会释放同步状态,释放后会唤醒其后继节点。
共享式与独占式的最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 获取失败,自旋获取同步状态
doAcquireShared(arg);
}
尝试去获取共享锁,获取成功返回true,否则返回false。该方法由继承AQS的子类自己实现。采用了模板方法设计模式。
如:ReentrantReadWriteLock的Sync内部类
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
private void doAcquireShared(int arg) {
// 添加共享模式节点到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 自旋获取同步状态
for (;;) {
// 当前节点的前驱
final Node p = node.predecessor();
// 如果前驱节点是head节点
if (p == head) {
// 尝试去获取共享同步状态
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将当前节点设置为头结点,并且释放也是共享模式的后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 真正的释放共享同步状态,并唤醒下一个节点
doReleaseShared();
}
}
private void doReleaseShared() {
// 自旋释放共享同步状态
for (;;) {
Node h = head;
// 如果头结点不为空 && 头结点不等于尾结点,说明存在有效的node节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头结点的状态为signal,说明存在需要唤醒的后继节点
if (ws == Node.SIGNAL) {
// 将头结点状态更新为0(初始值状态),因为此时头结点已经没用了
// continue为了保证替换成功
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
// 如果状态为初始值状态0,那么设置成PROPAGATE状态
// 确保在释放同步状态时能通知后继节点
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* 获取当前节点状态
*/
int ws = node.waitStatus;
// 如果当前节点的状态小于0,那么用CAS设置成0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 获取当前节点的后继节点
*/
Node s = node.next;
// 如果后继节点为空 || 或者后继节点的状态 > 0 (为取消状态)
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾结点查找状态不为取消的可用节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点
LockSupport.unpark(s.thread);
}
熬夜到凌晨,终于搞定了这篇文章,晚安,老铁们
云服务器,云硬盘,数据库(包括MySQL、Redis、MongoDB、SQL Server),CDN流量包,短信流量包,cos资源包,消息队列ckafka,点播资源包,实时音视频套餐,网站管家(WAF),大禹BGP高防(包含高防包及高防IP),云解析,SSL证书,手游安全MTP,移动应用安全、 云直播等等。
给个[在看],是对IT老哥最大的支持