在Java并发编程中,AQS(AbstractQueuedSynchronizer)是一个非常重要的基础框架,它提供了构建锁和其他同步器的基础。理解AQS的原理对于掌握Java并发编程至关重要。本文将从多个角度深入探讨AQS的原理,包括其定义、核心内容、与Lock锁的继承关系、公平锁与非公平锁的直观体现、acquire、tryAcquire、addWaiter等方法的底层逻辑,以及AQS排队后如何重新尝试获取资源。同时,作为一个大数据工程师,我们将通过Java代码演示这些原理。
AQS,全称AbstractQueuedSynchronizer,是Java并发包(java.util.concurrent.locks)下的一个抽象类。它定义了一套多线程访问共享资源的同步器框架,许多我们使用的同步器都是基于它来实现的,如常用的ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类。
AQS的核心成员变量包括:
private transient volatile Node head;
:CHL队列的头部节点,延迟初始化。除了初始化,它只通过setHead()
方法进行修改。如果head
节点存在,head
节点的waitStatus
保证不会被CANCELLED
。private transient volatile Node tail;
:CHL队列的尾部节点,延迟初始化。仅通过enq()
方法新增等待的节点。private volatile int state;
:表示共享资源的获取情况。为0时代表着没有线程获取过此资源,而等它大于0时,则表示有线程正在获取着资源。由于AQS支持可重入机制,state
为0表示没有线程拿到锁,而当state
为n时(n >= 1),表示线程拿到锁,n为重入次数。private transient Thread exclusiveOwnerThread;
:表示当前占据锁的线程。AQS的核心方法包括:
tryAcquire(int arg)
:独占方式。尝试获取资源,成功则返回true,失败则返回false。tryRelease(int arg)
:独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int arg)
:共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。tryReleaseShared(int arg)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。isHeldExclusively()
:该线程是否正在独占资源。只有用到condition才需要去实现它。AQS需要子类复写的方法均没有声明为abstract,目的是避免子类需要强制性覆写多个方法。因为一般自定义同步器要么是独占要么是共享方式,只需实现tryAcquire-tryRelease
或tryAcquireShared-tryReleaseShared
中的一种组合即可。当然,AQS也支持子类同时实现独占和共享两种模式,如ReentrantReadWriteLock
。
Lock接口的最主要实现类ReentrantLock中所有的方法实际上都是调用了其静态内部类Sync中的方法。Sync继承了AbstractQueuedSynchronizer(AQS),也就是说,Lock锁的整个体系是基于AQS同步器实现的。
ReentrantLock并非是直接继承AQS并实现里面的方法的,而是由里面的Sync类来继承AQS,并且里面还划分了NonfairSync和FairSync两个Sync子静态内部类来分别对公平锁和非公平锁做逻辑实现。
在Java的ReentrantLock中,可以通过构造函数传入一个boolean值fair
来设置锁是否为公平锁。默认是非公平锁,这是因为非公平锁的优点在于吞吐量比公平锁大。
state
从0置为1,如果成功则获取锁;如果失败,则执行acquire
方法加入等待队列。acquire
方法加入等待队列。acquire
方法是AQS中用于获取独占锁的核心方法。其源码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
head.next
),就尝试获取资源。如果该方法返回true,则会进入selfInterrupt()
的逻辑,进行阻塞。tryAcquire
方法是AQS提供给子类实现的钩子方法,子类可以自定义实现独占式获取资源的方式。以ReentrantLock的非公平锁实现为例:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
state
为0(表示锁未被占用),则尝试通过CAS将state
设置为请求的数量acquires
,并设置当前线程为锁的持有者,返回true表示获取锁成功。state
增加请求的数量acquires
,表示锁的重入次数增加,返回true表示获取锁成功。addWaiter
方法将当前线程包装成一个节点,加入同步队列的队尾。其源码如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
node
,将当前线程和模式(独占或共享)封装到节点中。tail
不为null),则尝试通过CAS将新节点设置为队尾,并更新原队尾的next
指针指向新节点。enq(node)
方法不断尝试,直到设置成功。enq(node)
方法的源码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
tail
为null),则初始化头节点head
为一个虚拟节点(不代表任何线程),并将tail
指向head
。next
指针指向新节点。acquireQueued
方法用于判断当前节点是否需要阻塞,并在一定条件下将其阻塞。其源码如下:
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
p
。p
是头节点head
,并且尝试获取锁成功(即tryAcquire(arg)
返回true),则将当前节点设置为头节点,并返回中断状态interrupted
。shouldParkAfterFailedAcquire(p, node)
判断是否需要阻塞当前节点。如果需要阻塞,则调用parkAndCheckInterrupt()
将当前线程阻塞,并返回中断状态interrupted
。interrupted
。shouldParkAfterFailedAcquire
方法用于判断当前节点是否需要阻塞。其源码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
pred
的等待状态ws
。ws
为SIGNAL
(表示后继节点需要被唤醒),则返回true,表示当前节点需要阻塞。ws
大于0(表示前驱节点已被取消),则不断向前遍历,直到找到一个非取消状态的前驱节点,并更新当前节点的前驱指针prev
。然后返回false,表示当前节点不需要阻塞(因为前面还有有效节点在等待)。ws
为0或其他负值(表示正常等待状态),则通过CAS将前驱节点的等待状态设置为SIGNAL
,并返回false,表示当前节点不需要阻塞(因为前驱节点已经设置为需要唤醒后继节点的状态)。parkAndCheckInterrupt
方法用于将当前线程阻塞,并检查在阻塞过程中是否被中断。其源码如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.park(this)
将当前线程阻塞。Thread.interrupted()
的结果,表示在阻塞过程中是否被中断。如果返回true,则表示被中断;如果返回false,则表示未被中断。假设我们有一个银行柜台服务,只有一个窗口可以办理业务。有多个客户(线程)需要办理业务,如果窗口被占用,则其他客户需要排队等待。这个场景非常适合使用ReentrantLock和AQS来实现。
下面是一个基于ReentrantLock和AQS的简单示例代码,演示了如何实现上述业务场景。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BankCounter {
private final Lock lock = new ReentrantLock();
public void serveCustomer(String customerName) {
lock.lock();
try {
System.out.println(customerName + " 正在办理业务");
// 模拟业务办理时间
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(customerName + " 被中断");
}
System.out.println(customerName + " 办理业务完毕");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
BankCounter bankCounter = new BankCounter();
Runnable task = () -> {
String customerName = Thread.currentThread().getName();
bankCounter.serveCustomer(customerName);
};
Thread customer1 = new Thread(task, "客户1");
Thread customer2 = new Thread(task, "客户2");
Thread customer3 = new Thread(task, "客户3");
customer1.start();
customer2.start();
customer3.start();
}
}
ReentrantLock
对象lock
,用于控制对银行柜台的访问。serveCustomer
方法模拟了为客户办理业务的逻辑。在办理业务之前,先通过lock.lock()
获取锁;在办理业务之后,通过lock.unlock()
释放锁。如果在办理业务过程中被中断,则打印中断信息,并重新抛出中断异常(通过Thread.currentThread().interrupt()
设置中断状态)。BankCounter
对象bankCounter
。bankCounter.serveCustomer
方法为客户办理业务。当运行上述代码时,输出结果可能如下(具体顺序可能因线程调度而异):
客户1 正在办理业务
客户1 办理业务完毕
客户2 正在办理业务
客户2 办理业务完毕
客户3 正在办理业务
客户3 办理业务完毕
从输出结果可以看出,三个客户依次办理了业务,没有并发冲突。这是因为ReentrantLock
通过AQS实现了线程的同步控制,确保了同一时间只有一个客户能够办理业务。
本文深入探讨了AQS的原理及其在Java并发编程中的应用。首先解释了AQS的定义和核心内容,包括核心成员变量和方法。然后分析了Lock锁和AQS的继承关系,以及公平锁和非公平锁的直观体现。接着详细解析了AQS的acquire
、tryAcquire
、addWaiter
等方法的底层逻辑,以及AQS排队后如何重新尝试获取资源。最后通过一个具体的业务场景和示例代码演示了AQS在实际应用中的使用。
作为一个大数据工程师,理解AQS的原理对于处理并发数据访问、实现高效的同步机制至关重要。希望本文能够帮助读者更好地掌握AQS的原理和应用。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。