摘要:
前面我们已经介绍和分析了管程,以及 JVM 层面的管程而 AQS 则是 Java 并发包中管程的一种实现。
下面是 AQS 的类实现关系图
// 头结点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 同步状态
private volatile int state;
// AbstractOwnableSynchronizer.class
// 当前持有独占锁的线程,类似对象头的Thread ID,可以用来判断是否为重入
private transient Thread exclusiveOwnerThread;
// 节点状态,有下面几种取值
volatile int waitStatus;
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 当前线程取消了锁的竞争
static final int CANCELLED = 1;
// 后继节点需要被唤醒
static final int SIGNAL = -1;
// 当前节点线程正在等待条件
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 双向链表
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 节点封装的线程
volatile Thread thread;
// 条件队列的单向链表
Node nextWaiter;
回到问题 TOP 1 ,可以分析得到 AQS 的数据结构是一个双向链表,并维护了一个全局状态
tryAcquire
这个方法 AQS 是定义了一个空方法,交由子类自行实现,这里也是采用了 模板设计模式
我们先暂时理解 tryAcquire
是尝试获取一下锁,后面会结合具体实现类来分析
可以看到如果 tryAcquire(arg)
返回 true,方法就结束了,如果返回 false 则往下走加入队列,加入成功就设置中断状态
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
接下来我们来分析 addWaiter(Node.EXCLUSIVE)
如果你看过之前分析 Synchronized
那一篇的重量锁阶段相信你看到这会发现很熟悉,没错!就是在竞争失败后把当前线程封装到 node 节点(独占模式),与 Synchronized
不同的是 node 不是放在队列头部而是塞到队列的队尾处。
核心逻辑就是通过 CAS 自旋进行入队。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 队列不为空 直接 CAS 入队,成功直接返回
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 队列为空 或 CAS竞争失败 调用 enq 自旋入队
enq(node);
return node;
}
我们来看下 enq(node) 的逻辑,很简单就是:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquire(arg) 中的 addWaiter(Node.EXCLUSIVE) 已经分析完了,接下来我们分析外层的 acquireQueued() 方法。
上面逻辑已经把节点放入队列了,接下来的逻辑就是会把放入队列的节点不断获取锁,直到成功或者中断
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果前驱节点是队头就尝试获取锁,因为这个节点有可能是刚初始化的
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 注意,唯一 return 跳出方法的地方
return interrupted;
}
// 说明要么上面分支没获取到锁,要么不是头节点
// 接下来我们分析 shouldParkAfterFailedAcquire
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 根据 return 之前的赋值可以知道,failed 为 true 只有 tryAcquire 异常时候会出现
if (failed)
// 将 node 节点设置为 CANCELLED 状态
cancelAcquire(node);
}
}
接下来我们分析 shouldParkAfterFailedAcquire(p, node) 方法,注意传过来的第一个节点是 前驱节点 ,第二个是当前节点
该方法核心思想就是判断当前线程是否应该被挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点是唤醒状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// ws>0 代表前驱节点取消了排队
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 因为依赖前驱节点的唤醒,所以前驱不能是取消状态,再往前找,一直找到前驱不是取消状态的才停止
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 排除 ws = -1 和 ws > 0,加上前面初始化节点并没有看见设置 waitStatus
// 所以进入这个分支的也就是 waitStatus 为0
// 使用 CAS 把前驱节点状态设置为 唤醒状态,再次循环时候就会从第一个分支 return true
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果上个方法返回为 true ,就会接着调用 park 挂起当前线程。
正常来说 第一次都会为 false,因为 第一次只是设置状态,第二次才会校验状态
我们主要拿 ReentrantLock
和 Semaphore
的实现来举例子
ReentrantLock
有两个版本的实现,一个是公平锁,一个是非公平锁
我们先来看非公平锁,并没有什么特殊处理,就是先尝试获取一下,成功就返回,失败就入 AQS 的等待队列
ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c 为 0,说明还没有线程持有锁
if (c == 0) {
// 尝试 CAS 一下,成功就直接返回
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果有线程且是当前线程,说明是重入锁,state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
接下来我们来看公平锁,可以看到在非公平的基础上多判断一次 hasQueuedPredecessors
, hasQueuedPredecessors
的逻辑很简单,就是判断队列为不为空,如果不为空说明还有等待的,就不往下走了
ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 队列为空,就 CAS 尝试获取一下锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 队列不为空返回 true
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
Semaphore
是 Java 层面实现的一个信号量,也是分为公平和非公平版本,Semaphore
也是基于 AQS 来实现的,它是通过一个许可介质,获取许可就把许可减少,如果许可数小于0,就入队列阻塞等待许可的归还;归还许可的时候就把许可数增加。信号量这块可以参考前面的文章,有专门讲解这个机制。
我们先来看非公平的实现,其实就是对许可的减少
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取许可数量
int available = getState();
// 减去当前要获取的许可的剩余的许可数量
int remaining = available - acquires;
// CAS 修改许可的数量,如果小于0,则返回负数,在上一层调用的时候如果为负数会加入 AQS 的队列
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
再来看看公平锁的实现,相信看到 hasQueuedPredecessors
你又懂了
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
同 tryAcquire
一样, tryRelease
也是 AQS 中的一个模板方法,我们后面会分析 tryRelease
的具体实现,我们先来分析 release
同获取锁 acquire
不一样,这里是子类实现返回 true才往下走(后面我们会知道,这个是指是否完全释放),后面会调用 unparkSuccessor
方法来唤醒后继节点,需要注意的是传入的节点是 head 节点
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor 方法核心逻辑就是唤醒 head 的后继节点,如果后继节点的状态不是需要被唤醒的状态,就从后往前找到 waitStatus 是唤醒状态的最前面的节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 有可能后继节点取消了等待
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
// 找到最前面的一个waitStatus <= 0的节点,赋值给 s 等待被唤醒
if (t.waitStatus <= 0)
s = t;
}
// 后继节点正常且不为空就唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
同样的,我们主要拿 ReentrantLock
和 Semaphore
的实现来举例子
先来看 ReentrantLock
的实现,如果可重入的次数已经减少完了就返回true,走 AQS 的模板方法调用唤醒操作
ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 这里考虑到了是否为重入锁,也就是是否完全释放,
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新状态数
setState(c);
return free;
}
然后来看下 Semaphore
的实现,更新许可,成功则返回 true,走 AQS 的模板进行唤醒操作
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 当前许可数量,因为有可能有竞争,所以每次自旋后重新获取许可数量进行归还
int current = getState();
// 新增许可数量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS 更新许可数量,失败就重试
if (compareAndSetState(current, next))
return true;
}
}
通过对核心方法的分析,我们可以知道 AQS 定义了很多模板方法,扩展逻辑交由子类实现。
回到问题 TOP 2 ,可以知道采用的设计模式是模板设计模式
由上面对成员变量和核心方法的分析,我们可以看到 AQS 这个管程的实现其实和概念上是相同的,就是对队列和状态值的一个维护,也可以明白 Java 为什么使用管程为核心实现同步,其优势是面向对象,把复杂逻辑封装起来,对于使用更友好。
同样我们对 Semaphore
这个 Java 层面的信号量实现的分析,也明白了管程那篇文章中写的 管程和信号量等价 ,因为我们可以使用管程来实现信号量,也可以使用信号量来实现管程,只是管程对我们更加友好!
以上的总结也是对问题 TOP 3 的一个回答。