Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场

AQS

原创
作者头像
在下是首席架构师
发布于 2022-08-01 13:19:24
发布于 2022-08-01 13:19:24
31800
代码可运行
举报
文章被收录于专栏:从入门到出门从入门到出门
运行总次数:0
代码可运行

AQS全称是AbstractQueuedSynchronizer,形如其名,抽象队列同步器。

AQS定义了两种资源共享模式:

  • 独占式,每次只能有一个线程持有锁,例如ReentrantLock实现的就是独占式的锁资源。
  • 共享式,允许多个线程同时获取锁,并发访问共享资源,ReentrantWriteLockCountDownLatch等就是实现的这种模式。

它维护了一个volatile修饰的state变量和一个FIFO(先进先出)的队列。 其中state变量代表的是竞争资源标识,而队列代表的是竞争资源失败的线程排队时存放的容器

AQS中提供了操作state的方法:

  • getState();
  • setState();
  • compareSetState();
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

因为AbstractQueuedSynchronizer是一个抽象类,他采用模板方法的设计模式,规定了独占共享模式需要实现的方法,并且将一些通用的功能已经进行了实现,所以不同模式的使用方式,只需要自己定义好实现共享资源的获取与释放即可,至于具体线程在等待队列中的维护(获取资源入队列、唤醒出队列等),AQS已经实现好了。

可以重写的方法有如下几个:

  • isHeldExclusively();// 是否为独占模式;但是只有使用到了Condition的,才需要去实现它。例如:ReentrantLock。
  • boolean tryAcquire(int arg); // 独占模式;尝试获取资源,成功返回true,失败返回false。
  • boolean tryRelease(int arg) ; // 独占模式;尝试释放资源,成功返回true,失败返回false。
  • int tryAcquireShared(int arg); // 共享模式;尝试获取资源,负数表示失败;0表示成功,但是没有剩余可用资源了;正数表示成功,且有剩余可用资源。
  • boolean tryReleaseShared(int arg) ; // 共享模式;尝试释放资源,若释放资源后允许唤醒后续等待节点返回true,否则返回false。

上面的这几个方法在AbstractQueuedSynchronizer这个抽象类中,都没有被定义为abstract的,说明这些方法都是可以按需实现的,共享模式下可以只实现共享模式的方法(例如CountDownLatchSemaphore),独占模式下可以只实现独占模式的方法(例如ReentrantLock),也支持两种都实现,两种模式都使用(例如ReentrantReadWriteLock)。

AQS源码分析

我们先简单介绍AQS的两种模式的实现类的代表ReentrantLock独占模式 )和CountDownLatch共享模式 ),是如何来共享资源的一个过程,然后再详细通过AQS的源码来分析整个实现过程。

  • ReentrantLock在初始化的时候state=0,表示资源未被锁定。当A线程执行lock()方法时,会调用tryAcquire()方法,将AQS中队列的模式设置为独占,并将独占线程设置为线程A,以及将state+1。 这样在线程A没有释放锁前,其他线程来竞争锁,调用tryAcquire()方法时都会失败,然后竞争锁失败的线程就会进入到队列中。当线程A调用执行unlock()方法将state=0后,其他线程才有机会获取锁(注意ReentrantLock是可重入的,同一线程多次获取锁时state值会进行垒加的,在释放锁时也要释放相应的次数才算完全释放了锁)。
  • CountDownLatch会将任务分成N个子线程去执行,state的初始值也是N(state与子线程数量一致)。N个子线程是并行执行的,每个子线程执行完成后countDown()一次,state会通过CAS方式减1。直到所有子线程执行完成后(state=0),会通过unpark()方法唤醒主线程,然后主线程就会从await()方法返回,继续后续操作。

独占模式分析

AbstractQueuedSynchronizer 的类里面有一个静态内部类 Node,它代表的是队列中的每一个节点。 其 Node 类中有如下几个属性:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 节点的状态
volatile int waitStatus;
// 当前节点的前一个节点
volatile Node prev;
// 当前节点的后一个节点
volatile Node next;
// 当前节点中所包含的线程对象
volatile Thread thread;
// 等待队列中的下一个节点
Node nextWaiter;

其中 Node 类中还有几个常量,代表了几个节点的状态(waitStatus)值。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
  * waitStatus value to indicate the next acquireShared should
  * unconditionally propagate
*/
static final int PROPAGATE = -3;

首先节点的状态值waitStatus默认是0,然后下面几个常量有自己具体的含义。 CANCELLED = 1; 代表的是当前节点从同步队列中取消,当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。 SIGNAL = -1; 代表后继节点处于等待状态。后继结点入队时,会将前继结点的状态更新为SIGNAL。 CONDITION = -2; 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了 signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。 PROPAGATE = -3; 表示在共享模式下,前继节点在释放资源后会唤醒后继节点,并将这种共享模式传播下去。

通过上面几个固定的常量值,我们可以看出节点状态中通常负数值通常表示节点处于有效的等待状态,而正数值代表节点已经被取消了。

所以AQS源码中有很多地方都用waitStatus>0waitStatus<0这种方式来判断队列中节点的是否正常。

独占模式下,只能有一个线程占有锁资源,其他竞争资源的线程,在竞争失败后都会进入到等待队列中,等待占有锁资源的线程释放锁,然后再重新被唤醒竞争资源。

ReentrantLock加锁过程

ReentrantLock默认是非公平锁,就是说,线程在竞争锁的时候并不是按照先来后到的顺序来获取锁的,但是ReentrantLock也是支持公平锁的,在创建的时候传入一个参数值即可。 下面我们以ReentrantLock默认情况下的加锁来分析AQS的源码。 ReentrantLock并没有直接继承AQS类,而是通过内部类来继承AQS类的。 我们在用ReentrantLock加锁的时候都是调用的用lock()方法,那么我们来看看默认非公平锁下,lock()方法的源码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

通过上面源码可以看到,lock()方法,首先是通过CAS的方式抢占锁,如果抢占成功则将state的值设置为1。然后将对象独占线程设置为当前线程。setExclusiveOwnerThread方法代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

如果抢占锁失败,就会调用acquire()方法,这个acquire()方法的实现就是在AQS类中了,说明具体抢占锁失败后的逻辑,AQS已经规定好了模板。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

独占模式是需要实现tryAcquire()方法的,这里首先就是通过tryAcquire()方法抢占锁,如果成功返回true,失败返回false。tryAcquire()方法的具体实现,是在ReentrantLock里面的,AQS类中默认是直接抛出异常的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
     final Thread current = Thread.currentThread();
     int c = getState();// 获取state值
     if (c == 0) { 
     // 如果state值为0,说明无锁,那么就通过cas方式,尝试加锁,成功后将独占线程设置为当前线程
         if (compareAndSetState(0, acquires)) {
             setExclusiveOwnerThread(current);
             return true;
         }
     }
     else if (current == getExclusiveOwnerThread()) { // 如果是同一个线程再次来获取锁,那么就将state的值进行加1处理(可重入锁的,重入次数)。
         int nextc = c + acquires;
         if (nextc < 0) // overflow
             throw new Error("Maximum lock count exceeded");
         setState(nextc);
         return true;
     }
     return false;
 }

我们继续来看acquire()方法,在执行完tryAcquire()方法后,如果加锁失败那么就会执行addWaiter()方法和acquireQueued(),这两个方法的作用是将竞争锁失败的线程放入到等待队列中。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

addWaiter()方法的源码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

addWaiter()中主要做了三件事:

  • 将当前线程封装成Node。
  • 判断队列中尾部节点是否为空,若不为空,则将当前线程的Node节点通过CAS插入到尾部。
  • 如果尾部节点为空或CAS插入失败则通过enq()方法插入到队列中。
那么enq()方法是又是怎么插入节点的呢?

enq()方法源码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
	//开旋
        for (;;) {
	    //获取等待队列的头节点
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
		//将即将加入的节点的前节点设置为头节点
                node.prev = t;
		//设置将尾节点设置为即将加入的节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

这样addWaiter()方法就构造了一个队列,并将当前线程添加到了队列中了。 我们再回到acquire()方法中。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

现在就剩下acquireQueued()方法没看了,这个方法中的操作挺多的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
	    //旋起
            for (;;) {
		//获取前级节点
                final Node p = node.predecessor();
		//如果前级节点为head,并且执行抢占锁成功。
                if (p == head && tryAcquire(arg)) {
		    //抢占锁成功,当前节点成功新的head节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
		//如果当前节点不为head,或者抢占锁失败。就根据节点的状态决定是否需要挂起线程。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • 首先获取节点的前级节点。
  • 如果当前节点的前级节点是head那么就可以去抢占锁了。
  • 抢占成功后就将新节点设置为head,原先的head置为空。
  • 如果抢占锁失败,则根据waitStatus值决定是否挂起线程。
  • 最后,通过cancelAcquire()取消获取锁操作。

下面看一下shouldParkAfterFailedAcquire()parkAndCheckInterrupt()这两个方法是如何挂起线程的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	//获取前级节点的waitStatus
        int ws = pred.waitStatus;
	//SIGNAL前面有提到,值为-1,代表后继节点处于等待状态。后继结点入队时,会将前继结点的状态更新为SIGNAL。
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
	//如果前级节点的waitStatus值大于0说明前级节点已经取消了。
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
		//如果前级节点已经是CANCEL状态了,那么会继续向前找,
		//直到找到的节点不是CANCEL(waitStatue>0)状态的节点,然后将其设置为当前节点的前级节点。
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
	    //如果前级节点为0或者其他不为-1的小于0的值,则将当前节点的前级节点设置为 SIGNAL(-1)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt()方法的作用就是挂起线程,如果shouldParkAfterFailedAcquire()方法成功,会执行parkAndCheckInterrupt()方法,它通过LockSupport的park()方法,将当前线程挂起(WAITING),它需要unpark()方法唤醒它,通过这样一种FIFO机制的等待,来实现Lock操作。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

LockSupport是JDK从1.6开始提供的一个线程同步工具类,在这里主要用到了它的两个方法,挂起线程和唤醒线程:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static void park() {
    UNSAFE.park(false, 0L);
}
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

LockSupport的挂起和唤醒线程都是不可重入的,它由一个许可标志,当调用park()时就会将许可设置为0,挂起线程,如果再调用一次park(),会阻塞线程。当调用unpark()时才会将许可标志设置成1。

ReentrantLock释放锁过程

ReentrantLock释放锁的过程主要有两个阶段:

ReentrantLock释放锁的过程主要有两个阶段:

  • 释放锁。
  • 唤醒挂起的线程。 unlock()方法的源码如下。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void unlock() {
   sync.release(1);
}

释放锁的方法是写在父类,AbstractQueuedSynchronizer类中的。 源码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
	    //释放成功后,判断头节点的状态是否为无锁状态,如果不为无锁状态就将头节点中的线程唤醒。
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

我们首先释放资源来看tryRelease()方法的源码,看看释放资源是怎样的过程。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected final boolean tryRelease(int releases) {
	    //从state中减去传入参数的相应值(一般为1)
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

tryRelease()方法在释放锁资源时,可以单纯的理解为是修改独占模式的状态值和置空占有线程的操作。将state的值减掉相应的参数值(一般是1),如果计算结果为0,就将他的独占线程设置为null,其他线程才有机会抢占成功。 在加锁时,同一线程加一次锁,state状态值就会加1,在解锁的时候每解锁一次就会减1,同一个锁可重入,只有lock次数与unlock次数相同才会释放资源,将独占线程设置为null。

释放了资源后,我们再看唤醒挂起线程时的过程。这个过程就在unparkSuccessor()方法中。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

这个释放过程就是将需要释放的线程节点设置成无锁状态,然后去队列中找到可以唤醒的节点,进行唤醒线程。 有一点需要解释一下,就是在寻找可以唤醒的节点时,为什么要从后向前找? 在上面unparkSuccessor()方法的源码里面有一段英文注释(7行~12行):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
	#这段英文注释翻译过来的大概意思就是:线程唤醒的时候,
	#通常是从当前线程的下个节点线程开始寻找,但是下个节点有可能已经取消了或者为null了,
	#所以从后向前找,直到找到一个非 取消状态的节点线程。

独占模式总结

  1. 通过cas抢占锁,如果抢占成功,更改state为+1,更改抢占线程为当前线程
  2. 如果抢占失败,将线程封装成node节点并通过cas放入等待队列,并将线程挂起
  3. 线程释放锁时,会唤醒下一个节点,如果下一个节点为空,则从尾部向前遍历寻找不为空节点唤醒

看完独占模式的锁可能会有的疑问

  • Q:tryReleas为什么没有递归释放锁? A:因为不需要在AQS里递归释放锁,因为一个线程不断地抢到锁,它也就会不断地lock(),unlock()
  • Q:AQS为什么在下一个节点为空的时候,要从队列的尾节点唤醒? A:因为高并发下的enq()方法的原子性问题,参考这篇文章 https://blog.csdn.net/foxException/article/details/108917338

共享模式分析

CountDownLatch为例

CountDownLatch的获取共享资源的过程

在使用CountDownLatch的时候,是先创建CountDownLatch对象,然后在每次执行完一个任务后,就执行一次countDown()方法。直到通过getCount()获取到的值为0时才算执行完,如果count值不为0可通过await()方法让主线程进行等待,直到所有任务都执行完成,count的值被设为0。 那么我们先来看创建CountDownLatch的方法。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

我们看到创建CountDownLatch的过程,其实就是将count值赋值给state的过程。

再来看await()方法的源码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
	//如果线程已经中断,直接抛出异常结束。
        if (Thread.interrupted())
            throw new InterruptedException();
	//CountDownLatch重写的tryAcquireShared()方法,如果state!=0,则返回-1
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
	//添加一个共享node到等待队列中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
		//获取前置节点
                final Node p = node.predecessor();
                if (p == head) {
		    //如果前级节点是头节点,直接尝试获取共享资源。
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
			//如果获取共享资源成功,将head节点指向自己
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
		//如果不是前级节点不是head节点,就根据前级节点状态,判断是否需要挂起线程。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
		//如果执行失败,取消获取共享资源的操作。
                cancelAcquire(node);
        }
    }

这里的方法和独占模式下acquireQueued()方法很像,只是在设置头节点唤醒新线程的时候有所不同,在setHeadAndPropagate()方法里面。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

setHeadAndPropagate()这个方法名称翻译成中文是“设置头节点并传播”,其实就是在获取共享锁资源的时候,如果资源除了用于唤醒下一个节点后,还有剩余,就会用于唤醒后面的节点,直到资源被用完。这里充分体现共享模式的“ 共享 ”。

CountDownLatch释放资源

我们再来看countDown()方法是如何释放资源的。 源码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void countDown() {
    sync.releaseShared(1);
}

CountDownLatch中内部类Sync的releaseShared()方法,是使用的AQS的releaseShared()方法。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final boolean releaseShared(int arg) {
	//尝试释放资源
        if (tryReleaseShared(arg)) {
	    //释放资源成功后,唤醒节点
            doReleaseShared();
            return true;
        }
        return false;
    }

尝试释放资源方法tryReleaseShared()是AQS规定需要自己来实现的,CountDownLatch的实现如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0) // 若state为0,说明已经不需要释放资源了,直接返回false。
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))// 真正的释放资源,是通过CAS的方式将state的值减1。
            return nextc == 0;
    }
}

其实主要的就是通过CAS的方式将state的值减1的操作。 释放资源成功后,就到了唤醒节点的过程了,在doReleaseShared()方法中。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void doReleaseShared() {
  for (;;) {
      Node h = head;
      if (h != null && h != tail) {// 当头节点不为空,并且不等于尾节点时,从头开始唤醒。
          int ws = h.waitStatus;// 获取头节点的等待状态
          if (ws == Node.SIGNAL) {// 如果头节点状态为等待唤醒,那么将头节点的状态设置为无锁状态,若CAS设置节点状态失败,就自旋。
              if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                  continue;            // loop to recheck cases
              unparkSuccessor(h);// 唤醒头节点
          }// 如果head节点的状态已经为无锁状态了,那么将head节点状态设置为可以向下传播唤醒的状态(PROPAGATE)。
          else if (ws == 0 &&
                   !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
              continue;                // loop on failed CAS
      }
      // 若在执行过程中,head节点发生的变化,直接跳出循环。
      if (h == head)                   // loop if head changed
          break;
  }
}

至此,AQS的独占模式和共享模式,在获取共享资源和释放共享资源的过程,就总结完了。内容有点多,需要好好消化一下,能看到最后的也都厉害的人物,因为我自己在总结这部分内容的时候也是查阅了很多资料,看了很多源码,用了好几天的时间才自己总结明白,AQS到底是个什么东西,是怎么一个执行过程。

其实AQS里面不只我上面总结的这些内容,里面比如还有Condition、以及可中断的获取资源(acquireInterruptibly【独占】、acquireSharedInterruptibly【共享】acquire()和acquireShared()在线程等待过程中都是忽略中断的),还有ReentrantLock是如何实现公平锁的(其实是在竞争资源时如果有新进入的线程,先判断队列中是否有节点,如果有直接插入队尾等待,按顺序获取资源)。

通过总结了AQS,基于AQS实现的ReentrantLock、CountDownLatch、Semaphore等的源码基本上就能看懂了,甚至再上层的CyclicBarrier、CopyOnWriteArrayList我们通过看源码也能知道大概是一个什么过程了。

最后,内容有点多,有写的不好的地方也欢迎指正(我要是能闹明白就改,不明白我也没法改😂)。

内容来源

你来讲讲AQS是什么吧?都是怎么用的? - 纪莫 - 博客园 (cnblogs.com)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
AQS解析与实战
前段时间在面试,发现面试官都有问到同步器AQS的相关问题。AQS为Java中几乎所有的锁和同步器提供一个基础框架,派生出如ReentrantLock、Semaphore、CountDownLatch等AQS全家桶。本文基于AQS原理的几个核心点,谈谈对AbstractQueuedSynchronizer的理解,并实现一个自定义同步器。
捡田螺的小男孩
2020/04/15
6740
AQS解析与实战
【冲刺大厂面试】锁和分布式锁的那些事之AQS核心原理
AQS是什么?AQS是JDK提供的一个Java类(AbstractQueuedSynchronizer)
用户4919348
2022/04/13
4140
【冲刺大厂面试】锁和分布式锁的那些事之AQS核心原理
【深入AQS原理】我画了35张图就是为了让你深入 AQS
谈到并发,我们不得不说AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore等都是基于AQS来实现的。
一枝花算不算浪漫
2020/05/07
2.3K1
【深入AQS原理】我画了35张图就是为了让你深入 AQS
【腾讯阿里最全面试题】介绍下Synchronized、Volatile、CAS、AQS,以及各自的使用场景
谈到并发,不得不谈ReentrantLock;而谈ReentrantLock,不得不谈AbstractQueuedSynchronizer(AQS)!
一个会写诗的程序员
2020/12/21
1.4K0
【腾讯阿里最全面试题】介绍下Synchronized、Volatile、CAS、AQS,以及各自的使用场景
AQS源码分析看这一篇就够了
好了,我们来开始今天的内容,首先我们来看下AQS是什么,全称是 AbstractQueuedSynchronizer 翻译过来就是【抽象队列同步】对吧。通过名字我们也能看出这是个抽象类
用户4919348
2020/06/02
1.1K0
话说AQS
AQS 如果没有具体的实现类,DEMO是没有意义的 , 我们先简单看一下里边常用的一些方法吧
木子的昼夜
2021/03/09
2800
话说AQS
面试官:从源码角度讲讲ReentrantLock及队列同步器(AQS)
JDK 中独占锁(排他锁)的实现除了使用关键字 synchronized 外,还可以使用ReentrantLock。虽然在性能上 ReentrantLock 和 synchronized 没有什么大区别,但 ReentrantLock 相比 synchronized 而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。
业余草
2021/12/06
3560
面试官:从源码角度讲讲ReentrantLock及队列同步器(AQS)
多线程进阶——JUC并发编程之抽象同步队列AQS框架设计理念一探究竟🔥
现在看来我们还有点懵逼,这个框架具体是怎么设计的?下面我们翻看源码注释一探究竟!其中AQS里面维护了一个Node节点构造的CLH队列(FIFO)先进先出队列。
须臾之余
2021/12/28
3620
多线程进阶——JUC并发编程之抽象同步队列AQS框架设计理念一探究竟🔥
如何手写一个AQS?
AQS即AbstractQueuedSynchronizer,是用来实现锁和线程同步的一个工具类。大部分操作基于CAS和FIFO队列来实现。
Java识堂
2021/03/30
4600
如何手写一个AQS?
硬核的AQS
Java多线程在对共享资源进行访问时,如果不加以控制会存在线程安全问题,当我们使用多线程对共享资源访问时,通常会线程共享资源的进行访问线程数的控制:
shysh95
2021/04/07
2990
硬核的AQS
从ReentrantLock的实现看AQS的原理及应用
AQS作为JUC中构建锁或者其他同步组件的基础框架,应用范围十分广泛,这篇文章会带着大家从可重入锁一点点揭开AQS的神秘面纱。
美团技术团队
2019/12/10
1.7K2
从ReentrantLock的实现看AQS的原理及应用
别走!这里有个笔记:图文讲解 AQS ,一起看看 AQS 的源码……(图文较长)
" AbstractQueuedSynchronizer 抽象队列同步器,简称 AQS 。是在 JUC 包下面一个非常重要的基础组件,JUC 包下面的并发锁 ReentrantLock 、 CountDownLatch 等都是基于 AQS 实现的。所以想进一步研究锁的底层原理,非常有必要先了解 AQS 的原理 "
程序员小航
2020/11/23
5240
别走!这里有个笔记:图文讲解 AQS ,一起看看 AQS 的源码……(图文较长)
到底什么是AQS?面试时你能说明白吗!
上篇文章写到CAS算法时,里面使用AtomicInteger举例说明,这个类在java.unit.concurrent.atomic包中,存储的都是一些原子类,除此之外,“java.unit.concurrent”,这个包作为Java中最重要的一个并发工具包,大部分的并发类都在其中,我们今天就来继续学习这个包中的其他并发工具类。
JavaBuild
2024/05/27
1190
到底什么是AQS?面试时你能说明白吗!
一文读懂Java并发编程之AQS
AbstractQueuedSynchronizer是一个双端队列,元素可以从队首进出,也可以从队尾进出,下面是AbstractQueuedSynchronizer 的成员变量,后面我们就叫 AbstractQueuedSynchronizer 为AQS。
小四的技术之旅
2022/07/26
5340
一文读懂Java并发编程之AQS
1.5w字,30图带你彻底掌握 AQS!
AQS( AbstractQueuedSynchronizer )是一个用来构建锁和同步器(所谓同步,是指线程之间的通信、协作)的框架,Lock 包中的各种锁(如常见的 ReentrantLock, ReadWriteLock), concurrent 包中的各种同步器(如 CountDownLatch, Semaphore, CyclicBarrier)都是基于 AQS 来构建,所以理解 AQS 的实现原理至关重要,AQS 也是面试中区分侯选人的常见考点,我们务必要掌握,本文将用循序渐近地介绍 AQS,相信大家看完一定有收获。文章目录如下
敖丙
2020/10/27
7970
1.5w字,30图带你彻底掌握 AQS!
图文并茂:AQS 是怎么运行的?
如果你想深入研究Java并发的话,那么AQS一定是绕不开的一块知识点,Java并发包很多的同步工具类底层都是基于AQS来实现的,比如我们工作中经常用的Lock工具ReentrantLock、栅栏CountDownLatch、信号量Semaphore等,而且关于AQS的知识点也是面试中经常考察的内容,所以,无论是为了更好的使用还是为了应付面试,深入学习AQS都很有必要。
Java技术栈
2020/12/08
4990
图文并茂:AQS 是怎么运行的?
【洞悉AQS】通过ReentrantLock一步一图彻底了解AQS实现原理
谈到并发,我们不得不说AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,例如:
一枝花算不算浪漫
2022/05/11
3230
【洞悉AQS】通过ReentrantLock一步一图彻底了解AQS实现原理
Java并发之AQS详解[通俗易懂]
今天学了学并发AQS机制,是抽象队列同步器,用户主要通过继承AQS类,来实现自定义锁,从而完成特定功能,AQS提供了两种锁(1)共享锁(2)排他锁。 下面这个博客介绍的AQS机制挺不错可以看看 原文链接 一、概述   谈到并发,不得不谈ReentrantLock;而谈到ReentrantLock,不得不谈AbstractQueuedSynchronizer(AQS)!
全栈程序员站长
2022/09/22
6730
JUC并发—5.AQS源码分析一
ReentractLock是重入锁,属于排他锁,功能和synchronized类似。但是在实际中,其实比较少会使用ReentrantLock。因为ReentrantLock的实现及性能和syncrhonized差不多,所以一般推荐使用synchronized而不是ReentrantLock。
东阳马生架构
2025/04/24
1010
并发编程之深入理解ReentrantLock和AQS原理
AQS(AbstractQueuedSynchronizer)在并发编程中占有很重要的地位,可能很多人在平时的开发中并没有看到过它的身影,但是当我们有看过concurrent包一些JDK并发编程的源码的时候,就会发现很多地方都使用了AQS,今天我们一起来学习一下AQS的原理,本文会用通俗易懂的语言描述AQS的原理。当然如果你了解CAS操作、队列、那么我相信你学习起来会感到无比轻松。
全栈程序员站长
2021/08/05
2960
相关推荐
AQS解析与实战
更多 >
LV.4
这个人很懒,什么都没有留下~
加入讨论
的问答专区 >
1产品KOL擅长5个领域
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
    本文部分代码块支持一键运行,欢迎体验
    本文部分代码块支持一键运行,欢迎体验