点击上方疾风先生可以订阅哦
AQS基础概念与作用
AQS核心组件
// Node 部分代码,在AQS内部中定义
static final class Node{
// 锁的mode
static final Node SHARED = new Node(); // 表示持有共享锁
static final Node EXCLUSIVE = null; // 持有独占锁
// waiter status,即节点所处的阻塞状态列表如下
static final int CANCELLED = 1; // 被取消,意味着放弃竞争锁资源,移出阻塞队列
static final int SIGNAL = -1; // 持有锁状态
static final int CONDITION = -2; // 仅用于条件变量,线程处于条件等待队列中,也就是condition.await让线程挂起
static final int PROPAGATE = -3; // 仅用于共享锁,表示已经释放锁并已唤醒下一个阻塞节点的线程状态
volatile int waitStatus; // 当前节点的状态,初始化为0,不属于上述任何一种状态,属于可竞争获取锁的状态
// 实现双端链表
volatile Node prev;
volatile Node next;
// 当前节点的线程
volatile Thread thread;
// 标志当前节点是共享锁还是独占锁,用节点指针引用指向对应的mode
Node nextWaiter;
// 独占锁:nextWaiter = null & thread = Thread.currentThread;
// 共享锁:nextWaiter = SHARED & thread = Thread.currentThread;
// 不具备上述条件属于正常对象,不持有锁状态
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
// ConditionObject部分代码,在AQS内部中定义
public class ConditionObject implements Condition,Serializable {
// 定义队列的首尾节点,内部自定义队列实现wait queue的操作
private transient Node firstWaiter;
private transient Node lastWaiter;
private static final int REINTERRUPT = 1; // AQS内部调用Thread.interrupt()中断当前线程
private static final int THROW_IE = -1; // 直接向当前线程抛出中断异常
// implements Condition methods ...
// 调用await,当前线程挂起
await(){}
// 调用signal,当前线程被唤醒
signal(){}
signalAll(){}
// 挂起/唤醒超时方法 ...
// 内部核心方法
// 添加一个新的节点到当前条件阻塞队列中
private Node addConditionWaiter() {
// ...
}
// 对于已经cancelled的waiter移出队列
private void unlinkCancelledWaiters() {
// ...
}
// 唤醒持有相同的Condition的线程去争抢资源锁,获取到锁并通过CAS设置对应节点的waiter status为SIGNAL
private void doSignalAll(Node first) {
// ...
}
}
// AQS.java
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 自定义实现双向FIFO的队列, 定义队列的head & tail
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state; // 当前AQS同步状态信息,具体实现类含义可能不一样
// CAS操作上述的state,head&tail, waitstuts
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 除了获取/释放锁之外,内部定义核心方法
private Node enq(final Node node) {} // 将Node插入队列
private Node addWaiter(Node mode) {} // 创建想要持有mode的锁方式的Node并将其添加到阻塞队列中
// 内部实现的独占锁方式
final boolean acquireQueued(final Node node, int arg) {} // 获取独占锁,失败则挂起
private void unparkSuccessor(Node node) {} // 唤醒当前Node的下一个节点的线程
// 内部实现共享锁
private void doReleaseShared() {} // 唤醒下一个节点的线程并通知其他节点的线程已经释放锁
private void doAcquireShared(int arg) {} // 如果获取锁失败则挂起线程,成功则持有锁
}
AQS工作原理
独占锁资源接口
- 模板方法: acquire/release
- 具体实现方法: tryAcquire/tryRelease
共享锁资源接口
- 模板方法: acquireShared/releaseShared
- 具体实现方法: tryAcquireShared/tryReleaseShared
获取锁
- acquire/acquireShared定义资源争抢锁逻辑,没有拿到则加入队列等待池中等待
- tryAcquire/tryAcquireShared实际执行占用资源的操作,交由具体实现的AQS完成
释放锁
- release/releaseShared定义释放资源的逻辑,释放后唤醒等待池下一个的节点线程
- tryRelease/tryReleaseShared实际释放资源的操作,由具体实现的AQS完成
PROPAGATE
来控制,也就是拥有这个状态的wait status说明已经释放锁并唤醒其他线程争抢锁PROPAGATE
来控制,告知节点已经释放锁并通知其他线程自定义AQS
// DefineAQS.java
public abstract class DefineAQS {
final static class AQSNode{
final static int SHARED = 9999;
final static int EXCLUSIVE = -9999;
private int mode;
private volatile Thread thread;
public AQSNode(int mode){
this.thread = Thread.currentThread();
this.mode = mode;
}
public Thread getThread() {
return thread;
}
public int getMode() {
return mode;
}
}
private AtomicInteger state = null;
private AtomicReference<Thread> exclusiveOwnerThread = new AtomicReference<>();
private LinkedBlockingQueue<AQSNode> waiters = new LinkedBlockingQueue<>();
public AtomicInteger getState() {
return state;
}
public void setState(int state) {
this.state = new AtomicInteger(state);
}
public void compareAndSetState(int expect, int update){
this.state.compareAndSet(expect, update);
}
public void acquire(int arg){
// 加入阻塞队列中
AQSNode node = new AQSNode(AQSNode.EXCLUSIVE);
waiters.offer(node);
while (!tryAcquire(arg)){
LockSupport.park(node.getThread());
}
// 当前线程已经获取到锁,移出阻塞队列,通知后续节点
waiters.remove(node);
}
public void release(int arg){
if (tryRelease(arg)) {
while (true){
AQSNode node = waiters.peek();
if (node.getMode() == AQSNode.EXCLUSIVE){
LockSupport.unpark(node.getThread());
break;
}
}
}
}
public void acquireShared(int arg){
AQSNode node = new AQSNode(AQSNode.SHARED);
waiters.offer(node);
while (tryAcquireShared(arg) < 0){
LockSupport.park(node.getThread());
}
waiters.remove(node);
}
public void releaseShared(int arg){
if (tryReleaseShared(arg) > 0){
while (true){
AQSNode node = waiters.peek();
if (node.getMode() == AQSNode.SHARED){
LockSupport.unpark(node.getThread());
break;
}
}
}
}
// abstract method :
你好,我是疾风先生,先后从事外企和互联网大厂的java和python工作, 记录并分享个人技术栈,欢迎关注我的公众号,致力于做一个有深度,有广度,有故事的工程师,欢迎成长的路上有你陪伴,关注后回复greek可添加私人微信,欢迎技术互动和交流,谢谢!