首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >AQS原理及多线程资源获取优化探讨

AQS原理及多线程资源获取优化探讨

原创
作者头像
小马哥学JAVA
发布2024-12-31 10:07:15
发布2024-12-31 10:07:15
21600
代码可运行
举报
运行总次数:0
代码可运行

一、引言

在Java并发编程中,AQS(AbstractQueuedSynchronizer)是一个非常重要的基础框架,它提供了构建锁和其他同步器的基础。理解AQS的原理对于掌握Java并发编程至关重要。本文将从多个角度深入探讨AQS的原理,包括其定义、核心内容、与Lock锁的继承关系、公平锁与非公平锁的直观体现、acquire、tryAcquire、addWaiter等方法的底层逻辑,以及AQS排队后如何重新尝试获取资源。同时,作为一个大数据工程师,我们将通过Java代码演示这些原理。

二、AQS是什么?核心内容有什么?

2.1 AQS的定义

AQS,全称AbstractQueuedSynchronizer,是Java并发包(java.util.concurrent.locks)下的一个抽象类。它定义了一套多线程访问共享资源的同步器框架,许多我们使用的同步器都是基于它来实现的,如常用的ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类。

2.2 AQS的核心内容

2.2.1 核心成员变量

AQS的核心成员变量包括:

  • private transient volatile Node head;:CHL队列的头部节点,延迟初始化。除了初始化,它只通过setHead()方法进行修改。如果head节点存在,head节点的waitStatus保证不会被CANCELLED
  • private transient volatile Node tail;:CHL队列的尾部节点,延迟初始化。仅通过enq()方法新增等待的节点。
  • private volatile int state;:表示共享资源的获取情况。为0时代表着没有线程获取过此资源,而等它大于0时,则表示有线程正在获取着资源。由于AQS支持可重入机制,state为0表示没有线程拿到锁,而当state为n时(n >= 1),表示线程拿到锁,n为重入次数。
  • private transient Thread exclusiveOwnerThread;:表示当前占据锁的线程。
2.2.2 核心方法

AQS的核心方法包括:

  • tryAcquire(int arg):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int arg):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int arg):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int arg):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

AQS需要子类复写的方法均没有声明为abstract,目的是避免子类需要强制性覆写多个方法。因为一般自定义同步器要么是独占要么是共享方式,只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种组合即可。当然,AQS也支持子类同时实现独占和共享两种模式,如ReentrantReadWriteLock

三、Lock锁和AQS的继承关系

3.1 Lock接口与AQS的关系

Lock接口的最主要实现类ReentrantLock中所有的方法实际上都是调用了其静态内部类Sync中的方法。Sync继承了AbstractQueuedSynchronizer(AQS),也就是说,Lock锁的整个体系是基于AQS同步器实现的。

3.2 ReentrantLock与AQS的具体实现

ReentrantLock并非是直接继承AQS并实现里面的方法的,而是由里面的Sync类来继承AQS,并且里面还划分了NonfairSync和FairSync两个Sync子静态内部类来分别对公平锁和非公平锁做逻辑实现。

四、公平和非公平锁的直观体现

4.1 定义与特点

  • 公平锁:多个线程按照申请锁的顺序来获取锁,即按照线程的先后顺序来排队获取锁。当一个线程释放锁后,等待时间最长的线程会获得锁的访问权。公平锁能够保证每个线程都有机会获取到锁,避免饥饿现象的发生,但整体效率相对比较低。
  • 非公平锁:多个线程获取锁的顺序是不确定的,不按照申请锁的顺序来排队。一个线程在等待锁时,不管自己是不是在等待队列的头部,都有机会在其他线程释放锁后立即获取锁。非公平锁相对公平锁增加了获取资源的不确定性,但是整体效率得以提升,但可能会产生饥饿现象。

4.2 在ReentrantLock中的实现

在Java的ReentrantLock中,可以通过构造函数传入一个boolean值fair来设置锁是否为公平锁。默认是非公平锁,这是因为非公平锁的优点在于吞吐量比公平锁大。

  • 非公平锁:在获取锁时,直接尝试CAS将当前的state从0置为1,如果成功则获取锁;如果失败,则执行acquire方法加入等待队列。
  • 公平锁:在获取锁时,不直接尝试CAS,而是直接执行acquire方法加入等待队列。

五、AQS的acquire、tryAcquire、addWaiter底层逻辑

5.1 acquire方法

acquire方法是AQS中用于获取独占锁的核心方法。其源码如下:

代码语言:javascript
代码运行次数:0
运行
复制
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • tryAcquire(int arg):由子类实现,尝试获取独占锁。如果成功,则返回true,方法直接返回;如果失败,则返回false,执行后续逻辑。
  • addWaiter(Node.EXCLUSIVE):将当前线程包装成一个独占式的节点,加入同步队列的队尾,并返回当前线程所在的节点。
  • acquireQueued(addWaiter(Node.EXCLUSIVE), arg):如果当前节点是等待节点的第一个(即head.next),就尝试获取资源。如果该方法返回true,则会进入selfInterrupt()的逻辑,进行阻塞。
  • selfInterrupt():响应中断的逻辑,与主线关系不大,这里不详细分析。

5.2 tryAcquire方法

tryAcquire方法是AQS提供给子类实现的钩子方法,子类可以自定义实现独占式获取资源的方式。以ReentrantLock的非公平锁实现为例:

代码语言:javascript
代码运行次数:0
运行
复制
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
        setState(nextc);
return true;
    }
return false;
}
  • 如果state为0(表示锁未被占用),则尝试通过CAS将state设置为请求的数量acquires,并设置当前线程为锁的持有者,返回true表示获取锁成功。
  • 如果当前线程已经是锁的持有者(即重入锁的情况),则将state增加请求的数量acquires,表示锁的重入次数增加,返回true表示获取锁成功。
  • 如果上述两种情况都不满足,则返回false表示获取锁失败。

5.3 addWaiter方法

addWaiter方法将当前线程包装成一个节点,加入同步队列的队尾。其源码如下:

代码语言:javascript
代码运行次数:0
运行
复制
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
        node.prev = pred;
if (compareAndSetTail(pred, node)) {
            pred.next = node;
return node;
        }
    }
    enq(node);
return node;
}
  • 创建一个新的节点node,将当前线程和模式(独占或共享)封装到节点中。
  • 如果队列不为空(即tail不为null),则尝试通过CAS将新节点设置为队尾,并更新原队尾的next指针指向新节点。
  • 如果CAS失败(可能是因为队列为空或并发修改导致),则调用enq(node)方法不断尝试,直到设置成功。

enq(node)方法的源码如下:

代码语言:javascript
代码运行次数:0
运行
复制
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
if (compareAndSetTail(t, node)) {
                t.next = node;
return t;
            }
        }
    }
}
  • 自旋循环,直到将新节点成功加入队尾为止。
  • 如果队列为空(即tail为null),则初始化头节点head为一个虚拟节点(不代表任何线程),并将tail指向head
  • 如果队列不为空,则尝试通过CAS将新节点设置为队尾,并更新原队尾的next指针指向新节点。

六、AQS排队后如何重新尝试获取资源

6.1 acquireQueued方法

acquireQueued方法用于判断当前节点是否需要阻塞,并在一定条件下将其阻塞。其源码如下:

代码语言:javascript
代码运行次数:0
运行
复制
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
return interrupted;
            }
if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (Throwable t) {
        cancelAcquire(node);
if (interrupted)
            selfInterrupt();
throw t;
    }
}
  • 获取当前节点的前驱节点p
  • 如果前驱节点p是头节点head,并且尝试获取锁成功(即tryAcquire(arg)返回true),则将当前节点设置为头节点,并返回中断状态interrupted
  • 如果不满足上述条件,则调用shouldParkAfterFailedAcquire(p, node)判断是否需要阻塞当前节点。如果需要阻塞,则调用parkAndCheckInterrupt()将当前线程阻塞,并返回中断状态interrupted
  • 如果在阻塞过程中被中断,或者在执行过程中抛出异常,则取消当前节点的获取锁操作,并处理中断状态interrupted

6.2 shouldParkAfterFailedAcquire方法

shouldParkAfterFailedAcquire方法用于判断当前节点是否需要阻塞。其源码如下:

代码语言:javascript
代码运行次数:0
运行
复制
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
return false;
}
  • 获取前驱节点pred的等待状态ws
  • 如果wsSIGNAL(表示后继节点需要被唤醒),则返回true,表示当前节点需要阻塞。
  • 如果ws大于0(表示前驱节点已被取消),则不断向前遍历,直到找到一个非取消状态的前驱节点,并更新当前节点的前驱指针prev。然后返回false,表示当前节点不需要阻塞(因为前面还有有效节点在等待)。
  • 如果ws为0或其他负值(表示正常等待状态),则通过CAS将前驱节点的等待状态设置为SIGNAL,并返回false,表示当前节点不需要阻塞(因为前驱节点已经设置为需要唤醒后继节点的状态)。

6.3 parkAndCheckInterrupt方法

parkAndCheckInterrupt方法用于将当前线程阻塞,并检查在阻塞过程中是否被中断。其源码如下:

代码语言:javascript
代码运行次数:0
运行
复制
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
return Thread.interrupted();
}
  • 调用LockSupport.park(this)将当前线程阻塞。
  • 返回Thread.interrupted()的结果,表示在阻塞过程中是否被中断。如果返回true,则表示被中断;如果返回false,则表示未被中断。

七、业务场景与示例代码

7.1 业务场景

假设我们有一个银行柜台服务,只有一个窗口可以办理业务。有多个客户(线程)需要办理业务,如果窗口被占用,则其他客户需要排队等待。这个场景非常适合使用ReentrantLock和AQS来实现。

7.2 示例代码

下面是一个基于ReentrantLock和AQS的简单示例代码,演示了如何实现上述业务场景。

代码语言:javascript
代码运行次数:0
运行
复制
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BankCounter {
private final Lock lock = new ReentrantLock();
public void serveCustomer(String customerName) {
        lock.lock();
try {
            System.out.println(customerName + " 正在办理业务");
// 模拟业务办理时间
try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println(customerName + " 被中断");
            }
            System.out.println(customerName + " 办理业务完毕");
        } finally {
            lock.unlock();
        }
    }
public static void main(String[] args) {
BankCounter bankCounter = new BankCounter();
Runnable task = () -> {
String customerName = Thread.currentThread().getName();
            bankCounter.serveCustomer(customerName);
        };
Thread customer1 = new Thread(task, "客户1");
Thread customer2 = new Thread(task, "客户2");
Thread customer3 = new Thread(task, "客户3");
        customer1.start();
        customer2.start();
        customer3.start();
    }
}

7.3 代码解析

  • BankCounter类
    • 定义了一个ReentrantLock对象lock,用于控制对银行柜台的访问。
    • serveCustomer方法模拟了为客户办理业务的逻辑。在办理业务之前,先通过lock.lock()获取锁;在办理业务之后,通过lock.unlock()释放锁。如果在办理业务过程中被中断,则打印中断信息,并重新抛出中断异常(通过Thread.currentThread().interrupt()设置中断状态)。
  • main方法
    • 创建了一个BankCounter对象bankCounter
    • 定义了三个线程(代表三个客户),每个线程都执行相同的任务:调用bankCounter.serveCustomer方法为客户办理业务。
    • 启动三个线程,模拟多个客户同时办理业务的场景。

7.4 运行结果

当运行上述代码时,输出结果可能如下(具体顺序可能因线程调度而异):

代码语言:javascript
代码运行次数:0
运行
复制
客户1 正在办理业务
客户1 办理业务完毕
客户2 正在办理业务
客户2 办理业务完毕
客户3 正在办理业务
客户3 办理业务完毕

从输出结果可以看出,三个客户依次办理了业务,没有并发冲突。这是因为ReentrantLock通过AQS实现了线程的同步控制,确保了同一时间只有一个客户能够办理业务。

八、总结

本文深入探讨了AQS的原理及其在Java并发编程中的应用。首先解释了AQS的定义和核心内容,包括核心成员变量和方法。然后分析了Lock锁和AQS的继承关系,以及公平锁和非公平锁的直观体现。接着详细解析了AQS的acquiretryAcquireaddWaiter等方法的底层逻辑,以及AQS排队后如何重新尝试获取资源。最后通过一个具体的业务场景和示例代码演示了AQS在实际应用中的使用。

作为一个大数据工程师,理解AQS的原理对于处理并发数据访问、实现高效的同步机制至关重要。希望本文能够帮助读者更好地掌握AQS的原理和应用。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、引言
  • 二、AQS是什么?核心内容有什么?
    • 2.1 AQS的定义
    • 2.2 AQS的核心内容
      • 2.2.1 核心成员变量
      • 2.2.2 核心方法
  • 三、Lock锁和AQS的继承关系
    • 3.1 Lock接口与AQS的关系
    • 3.2 ReentrantLock与AQS的具体实现
  • 四、公平和非公平锁的直观体现
    • 4.1 定义与特点
    • 4.2 在ReentrantLock中的实现
  • 五、AQS的acquire、tryAcquire、addWaiter底层逻辑
    • 5.1 acquire方法
    • 5.2 tryAcquire方法
    • 5.3 addWaiter方法
  • 六、AQS排队后如何重新尝试获取资源
    • 6.1 acquireQueued方法
    • 6.2 shouldParkAfterFailedAcquire方法
    • 6.3 parkAndCheckInterrupt方法
  • 七、业务场景与示例代码
    • 7.1 业务场景
    • 7.2 示例代码
    • 7.3 代码解析
    • 7.4 运行结果
  • 八、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档