#前言
好久没写blog了,最近学车,时间真的少了很多,花了一些时间写了一篇AQS,请大家指正。
翻阅AbstractQueuedSynchronizer的源码,会发现如下注释:
Pprovides a framework for implementing blocking locks and related
synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues.
AbstractQueuedSynchronizer提供一个基于FIFO队列的框架,该框架用于实现阻塞锁和相关同步器(例如:semaphores)。
如此可知,AbstractQueuedSynchronizer可以视为JDK同步器的框架,理解它,有助于理解JDK的同步器。
本人依据JDK源码中的注释结合并发经验,总结了如下AQS框架说明:
//JDK中的源码
public final void acquire(int state) {
if (!tryAcquire(state) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), state))
selfInterrupt();
}
其对应代码的语义为:
while (!获取不成功) {
如果当前线程不在队列中, 加入队列
阻塞当前线程
}
即阻塞直到获取成功。
//JDK中的源码
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
对应代码的语义为:
if (尝试释放成功)
解锁队列中的第一个线程
如果当前节点为队列中的第一个节点,尝试获取,获取成功进行head后续节点的设置。如获取失败维护前后节点关系,若需要阻塞进行阻塞,之后继续重试。 若出现异常获取失败,取消当前节点获取操作。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//尝试获取失败
doAcquireShared(arg);//进行共享式获取
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {//当前节点的先驱节点为head,即当前节点为第一个
int r = tryAcquireShared(arg);
if (r >= 0) {//尝试获取成功
//向上冒泡,保证head节点的后驱节点为未获取到的节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//包装为 获取失败的节点 若需要中断进行中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
确保级联释放,即使有其他的线程正在进行的获取/释放。 这个过程通常尝试释放head的后续节点,如果他需要被释放。 如果该节点不需要,会向下传递释放动作,直到释放成功。 此外,我们必须在添加新节点时进行循环处理。不同于其他操作 中释放后续节点,我们需要知道CAS是否重置了状态,所以我们需要重复检查。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // 循环检查状态
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 循环检查CAS
}
if (h == head) // 循环检查是否有新节点
break;
}
}
在不可重入锁Mutex中 ,我们使用state=0表示释放,state=1表示获取
class Mutex implements Lock, java.io.Serializable {
// 内部助手同步类Sync
private static class Sync extends AbstractQueuedSynchronizer {
// 当state=1表示获取了独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 如果state=0,锁是释放状态,尝试获取
public boolean tryAcquire(int acquires) {
assert acquires == 1; // acquires为1表示进行获取操作,其他值无效
if (compareAndSetState(0, 1)) {//CAS操作
setExclusiveOwnerThread(Thread.currentThread());//设置锁的持有者为当前线程
return true;
}
return false;
}
//尝试释放
protected boolean tryRelease(int releases) {
assert releases == 1; // 传入的值为1表示进行释放,其他值无效
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);//设置状态为0,表示锁已释放
return true;
}
// 提供一个条件谓词
Condition newCondition() { return new ConditionObject(); }
// 反序列化属性
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); //设置初始状态为释放
}
}
// 所有同步操作 委托给Sync,下面我们实现必要的锁需要的操作
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
state=0表示未被通知(等待中,不可共享获取),state!=0表示被通知(可共享获取)
class BooleanLatch {
//内部同步器,state=0表示未被通知(等待中,不可共享获取),state!=0表示被通知(可共享获取)
private static class Sync extends AbstractQueuedSynchronizer {
boolean isSignalled() { return getState() != 0; }
/**
*tryAcquireShared 返回负值 获取失败
*0 获取成功其他线程不能获取
*正值获取成功,其他线程也可获取成功
/
protected int tryAcquireShared(int ignore) {
return isSignalled() ? 1 : -1;
}
protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}
private final Sync sync = new Sync();
public boolean isSignalled() { return sync.isSignalled(); }
public void signal() { sync.releaseShared(1); }
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
分析JDK中的同步类,除了了解AQS外,还要知道每个同步器中的state的语义是什么,AQS上边已经分析了,下面介绍下几个同步器的state的语义。
ReentrantLock 只支持独占方式的获取操作,它实现了tryAcquire,tryRelease和isHeldExclusively. ReentrantLock的状态用于存储锁获取的操作次数,同一线程每获取一次加1,每释放一次减少1. tryAcquire代码简要分析
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 当前状态值(即锁获取的操作)>0,锁的所有者非当前线程,获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)//如果状态值饱和,获取失败,即超过最大可获取线程数
throw new Error("Maximum lock count exceeded");
//符合获取锁的条件,更新状态值,
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置锁的持有者为当前线程
setExclusiveOwnerThread(current);
return true;
}
CountDownLatch同步状态保存当前的计数值。类似BooleanLatch,不做分析。 Semaphore的同步状态用于存储当前可以许可的数量。 Semaphore中的tryAcquireShared,tryReleaseShared tryAcquireShared,获取当前可用许可数量,若可用许可数量大于申请数量,通过compareAndSetState设置新的剩余许可数量,否则获取失败。 tryReleaseShared获取当前可用许可数量,如果当前剩余许可数量+释放数量>0,过compareAndSetState设置新的剩余许可数量,否则获取失败。
/**
*tryAcquireShared,获取当前可用许可数量,若可用许可数量大于申请数量,通过compareAndSetState设置新的剩余许可数量,否则获取失败。
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
*
*tryReleaseShared获取当前可用许可数量,如果当前剩余许可数量+释放数量>0,过compareAndSetState设置新的剩余许可数量,否则获取失败。
*/
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
FutueTask的同步器状态值如下:
NEW = 0; //初始状态
COMPLETING = 1; //运行中
NORMAL = 2; //完成
EXCEPTIONAL = 3; //异常
CANCELLED = 4; //已取消
INTERRUPTING = 5; //中断中
INTERRUPTED = 6; //已中断
可能的状态转换
NEW(初始状态) -> COMPLETING(运行中) -> NORMAL (已完成) NEW(初始状态) -> COMPLETING(运行中) -> EXCEPTIONAL (异常) NEW (初始状态)-> CANCELLED (已取消) NEW (初始状态)-> INTERRUPTING (中断中)-> INTERRUPTED (已中断)
Future.get的语义非常类似闭锁,如果发生了某件事件(由FutureTask表示的任务执行完成 或者取消),那么线程可以恢复执行,否则一致阻塞。
AQS是JDK并发的框架,仔细理解有助于理解JDK的同步工具。 对于JDK的部分同步类,进行了简要说明,详细自行查阅源码。 对于JDK同步类的源码建议进行如下步骤: 1.理解同步器的状态值的语义 2.该同步器使用是AQS的什么模式, 是共享,互斥,还是共享与互斥都有。 3.优先理解同步器的tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared方法,之后查看其它方法。