前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >万字图解 Java 并发框架:Fork/Join、CountDownLatch、Semaphore、CyclicBarrier

万字图解 Java 并发框架:Fork/Join、CountDownLatch、Semaphore、CyclicBarrier

作者头像
码哥字节
发布于 2025-04-23 04:39:33
发布于 2025-04-23 04:39:33
41302
代码可运行
举报
文章被收录于专栏:Java 技术栈Java 技术栈
运行总次数:2
代码可运行

本文图多,内容硬核,建议收藏。

第一章节《1.6w 字图解 Java 并发:多线程挑战、线程状态和通信、死锁;AQS、ReentrantLock、Condition 使用和原理》,我们开启了 Java 高并发系列的学习,透彻理解 Java 并发编程基础,主要内容有:

  1. 多线程挑战与难点
    1. 上下文切换
    2. 死锁
    3. 资源限制的挑战
    4. 什么是线程
    5. 线程的状态
    6. 线程间的通信
  2. Java 各种各样的锁使用和原理
    1. syncconized 的使用和原理
    2. AQS 实现原理
    3. ReentrantLock 的使用和原理
    4. ReentrantReadWriteLock 读写锁使用和原理
    5. Condition 的使用和原理

第二章《1.8w 字图解 Java 并发容器: CHM、ConcurrentLinkedQueue、7 种阻塞队列的使用和原理》主要内容如下:

  • ConcurrentHashMap的使用和原理
  • ConcurrentLinkedQueue 的使用和原理
  • Java 7 种阻塞队列的使用和原理详解:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue以及LinkedBlockingDeque

第三章节围绕着 Java 并发框架展开,主要内容如下:

  • Fork/Join 框架的使用和原理
  • CountDownLatch的使用和原理
  • CyclicBarrier的使用和原理
  • Semaphore的使用和原理
  • Exchanger 的使用和原理

开搞!

Fork/Join 框架

Chaya:码哥,什么是 Fork/Join 框架?

Fork/Join 是 Java 7 引入的并行计算框架,核心思想是 **"分而治之"**。它通过以下特性解决复杂计算问题:

  • 自动任务拆分:将大任务递归拆分为子任务
  • 工作窃取算法(Work-Stealing):最大化线程利用率
  • 轻量级线程管理:基于 ForkJoinPool 的优化线程池

Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结 果。

比如计算 1+2+…+10000,可以分割成 10 个子任务,每个子任务分别对 1000 个数进行求和, 最终汇总这 10 个子任务的结果。

Fork/Join 的运行流程图如下:

工作窃取算法

Chaya:“码哥。有任务要拆分,那必然会出现分配不均匀的情况?要如何实现负载均衡呢?”

这个问题问得好,Chaya 小姐姐。

我们设计一个工作窃取算法(Work-Stealing)来解决这个问题。每个工作线程维护一个双端队列(Deque):

  • 头部:执行自己拆分出的任务(LIFO)
  • 尾部:窃取其他线程的任务(FIFO)

工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。

工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并 且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

任务拆分流程

使用场景

场景 1:大规模数据处理(并行排序)

需求:对 10 亿条数据排序,要求内存可控且充分利用多核性能。

代码实现

代码语言:javascript
代码运行次数:2
运行
AI代码解释
复制
public class ParallelMergeSort extends RecursiveAction {
    privatefinalint[] array;
    privatefinalint start;
    privatefinalint end;
    privatestaticfinalint THRESHOLD = 1_000_000; // 拆分阈值

    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            Arrays.sort(array, start, end);  // 小任务直接排序
            return;
        }

        int mid = (start + end) >>> 1;
        invokeAll(
            new ParallelMergeSort(array, start, mid),
            new ParallelMergeSort(array, mid, end)
        );

        merge(array, start, mid, end);  // 合并结果
    }

    // 生产级优化:复用临时数组减少内存分配
    private void merge(int[] array, int start, int mid, int end) {
        int[] temp = ThreadLocalRandom.current().ints().toArray();
        // ... 合并逻辑 ...
    }
}

// 使用方式
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
int[] data = loadHugeData();
pool.invoke(new ParallelMergeSort(data, 0, data.length));

性能优化点

  1. 合理设置 THRESHOLD(通过压测确定最佳值)
  2. 避免在递归中频繁创建临时数组
  3. 使用 ThreadLocalRandom 保证线程安全
场景 2:金融计算(蒙特卡洛模拟)

需求:快速计算期权定价,要求高精度且低延迟。

代码实现

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MonteCarloTask extends RecursiveTask<Double> {
    privatefinalint iterations;
    privatestaticfinalint THRESHOLD = 10_000;

    @Override
    protected Double compute() {
        if (iterations <= THRESHOLD) {
            return calculateSync(); // 同步计算
        }

        MonteCarloTask left = new MonteCarloTask(iterations / 2);
        MonteCarloTask right = new MonteCarloTask(iterations / 2);
        left.fork();

        double rightResult = right.compute();
        double leftResult = left.join(); // 注意顺序:先计算再join

        return (leftResult + rightResult) / 2;
    }

    private double calculateSync() {
        double sum = 0;
        for (int i = 0; i < iterations; i++) {
            sum += randomSimulation();
        }
        return sum / iterations;
    }
}

// 生产级调用(指定超时)
ForkJoinPool pool = new ForkJoinPool(4);
MonteCarloTask task = new MonteCarloTask(1_000_000);
pool.submit(task);

try {
    double result = task.get(5, TimeUnit.SECONDS); // 严格超时控制
} catch (TimeoutException e) {
    task.cancel(true);
    // 降级策略...
}
ForkJoinPool 生产级配置

自定义线程工厂

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class NamedForkJoinThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
    privatefinal String namePrefix;
    privatefinal AtomicInteger counter = new AtomicInteger(1);

    @Override
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) {};
        thread.setName(namePrefix + "-" + counter.getAndIncrement());
        thread.setPriority(Thread.NORM_PRIORITY);
        thread.setDaemon(false); // 生产环境必须为非守护线程
        return thread;
    }
}

与其他并发框架对比

Fork/Join 适用场景

  1. 递归可分治的问题(排序、遍历、数学计算)
  2. 严格低延迟要求的计算任务
  3. 需要自动负载均衡的大规模数据处理

CountDownLatch

CountDownLatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。

例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

假如有这样一个需求:处理 10 万条数据,分片并行处理,全部完成后触发汇总操作。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class BatchProcessor {
    privatestaticfinalint BATCH_SIZE = 1000;
    privatefinal ExecutorService executor = Executors.newFixedThreadPool(8);

    public void process(List<Data> allData) {
        int total = allData.size();
        CountDownLatch latch = new CountDownLatch(total / BATCH_SIZE);

        for (int i = 0; i < total; i += BATCH_SIZE) {
            List<Data> batch = allData.subList(i, Math.min(i+BATCH_SIZE, total));
            executor.submit(() -> {
                try {
                    processBatch(batch);
                } finally {
                    latch.countDown(); // 确保计数减少
                }
            });
        }

        try {
            if (!latch.await(5, TimeUnit.MINUTES)) {
                thrownew TimeoutException("Batch processing timeout");
            }
            generateSummaryReport();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            shutdownNow();
        }
    }

    private void processBatch(List<Data> batch) { /* ... */ }
}

CountDownLatch 的构造函数接收一个 int 类型的参数作为计数器,如果你想等待 N 个点完 成,这里就传入 N。

当我们调用 CountDownLatch 的 countDown 方法时,N 就会减 1,CountDownLatch 的 await 方法 会阻塞当前线程,直到 N 变成零。

用在多个线程时,只需要把这个 CountDownLatch 的引用传递到线程里即可。

如果有某个线程处理得比较慢,我们不可能让主线程一直等待,所以可以使 用另外一个带指定时间的 await 方法——await(long time,TimeUnit unit),这个方法等待特定时 间后,就会不再阻塞当前线程。

实现原理

CountDownLatch 的核心实现原理是基于 AQSAQS 全称 AbstractQueuedSynchronizer,是 java.util.concurrent 中提供的一种高效且可扩展的同步机制;

它是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

除了 CountDownLatch 工具类,JDK 当中的 SemaphoreReentrantLock 等工具类都是基于 AQS 来实现的。下面我们用 CountDownLatch 来分析一下 AQS 的实现。

在上一章《1.6w 字图解 Java 并发:多线程挑战、线程状态和通信、死锁;AQS、ReentrantLock、Condition 使用和原理》有详细详解 AQS。

CountDownLatch 的源码实现,发现其实它的代码实现非常简单,算上注释也才 300+ 行代码,如果去掉注释的话代码不到 100 行,大部分方法实现都是调用的 Sync 这个静态内部类的实现,而 Sync 就是继承自 AbstractQueuedSynchronizer

CountDownLatch 的 UML 类图如下:

核心代码如下。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private staticfinalclass Sync extends AbstractQueuedSynchronizer {
    Sync(int count) { setState(count); } // 初始化计数器

    // 尝试获取共享锁:当 state=0 时返回 1(成功),否则返回 -1(失败)
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    // 尝试释放共享锁:CAS 递减 state,直到变为 0
    protected boolean tryReleaseShared(int releases) {
        for (;;) { // 自旋保证原子性
            int c = getState();
            if (c == 0) returnfalse; // 已经释放完毕
            int nextc = c - 1;
            if (compareAndSetState(c, nextc)) // CAS 更新
                return nextc == 0; // 返回是否触发唤醒
        }
    }
}

Sync 重写了 AQS 中的 tryAcquireSharedtryReleaseShared 两个方法。

当调用 CountDownLatchawit() 方法时,会调用内部类 SyncacquireSharedInterruptibly() 方法,在这个方法中会调用 tryAcquireShared 方法,这个方法就是 Sync 重写的 AQS 中的方法;

调用 countDown() 方法原理基本类似。

await() 方法实现

在调用 await() 方法时,会直接调用 AQS 类的 acquireSharedInterruptibly 方法,在 acquireSharedInterruptibly 方法内部会继续调用 Sync 实现类中的 tryAcquireShared 方法,在 tryAcquireShared 方法中判断 state 变量值是否为 0

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1); // 进入 AQS 核心逻辑
}

// AQS 中的实现
public final void acquireSharedInterruptibly(int arg) {
    if (Thread.interrupted()) throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) // 检查 state 是否为 0
        doAcquireSharedInterruptibly(arg); // 进入阻塞队列
}

doAcquireSharedInterruptibly 关键步骤。

关键点

  1. 节点入队:通过 addWaiter 方法将线程封装为 SHARED 模式节点加入队列尾部
  2. 自旋检查:循环判断前驱节点是否是头节点(公平性保证)
  3. 阻塞控制:调用 LockSupport.park() 挂起线程,响应中断。
countDown() 方法

当执行 CountDownLatchcountDown() 方法,将计数器减一,也就是将 state 值减一,当减到 0 的时候,等待队列中的线程被释放。是调用 AQSreleaseShared() 方法来实现的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void countDown() {
    sync.releaseShared(1); // 触发释放操作
}

// AQS 中的实现
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // CAS 递减 state
        doReleaseShared(); // 唤醒后续节点
        return true;
    }
    return false;
}

CyclicBarrier

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。

它要做的事情是,让一 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会 开门,所有被屏障拦截的线程才会继续运行。

现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。

例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。

CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。

CyclicBarrier 是 Java 并发包中的可重用同步屏障,其特性包括:

  • 多阶段协同:支持多次 await() 的同步点
  • 栅栏动作(Barrier Action):当所有线程抵达屏障时触发
  • 自动重置:每次所有线程通过屏障后自动复位
  • 中断处理:可响应线程中断并传播异常

如何使用

构造方法
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)

解析:

  • parties:传入一个计数器值,用来配置可以阻塞多少个线程的。
  • 第二个构造方法有一个 Runnable 参数,这个对象可以在计数器值减到 0 后,发起一次调用。

例如:下面代码就会在计数器减到 0 后,打印出"回环屏障退出"。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("回环屏障退出"));

场景 :多阶段分布式计算汇总

需求:实时计算商品库存分布(本地仓 + 区域仓 + 全国仓),需三阶段统计数据汇总。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class InventoryComputeService {
// 构建 3 线程等待 CyclicBarrier
    privatefinalint PARTIES = 3;
    privatefinal CyclicBarrier barrier = new CyclicBarrier(PARTIES, this::mergeData);
    // 保存最后的结果
    privatevolatile Map<String, Integer> result = new ConcurrentHashMap<>();

    public void compute() {
       // 异步执行 3 个任务,执行完成调用 barrier.await();,当所有任务完成后会执行 mergeData
        List<CompletableFuture<Void>> tasks = new ArrayList<>();
        tasks.add(computeLocalStock());
        tasks.add(computeRegionalStock());
        tasks.add(computeNationalStock());

        // 本次3 个任务任何一个计算出现异常的话,重置 barrier
        CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))
                        .exceptionally(ex -> {
                            barrier.reset();  // 异常处理
                            returnnull;
                        });
    }

    private CompletableFuture<Void> computeLocalStock() {
      // 异步线程
        return CompletableFuture.runAsync(() -> {
            try {
                // 模拟计算耗时
                result.put("local", calculate(Region.LOCAL));
                // 执行完成,调用 await
                barrier.await(5, TimeUnit.SECONDS); // 超时控制
            } catch (Exception e) {
                handleException(e);
            }
        });
    }

    // computeRegionalStock/computeNationalStock 同理...

    private void mergeData() {
        lock.lock();
        try {
            System.out.println("各区域最终库存合并结果: " + result);
        } finally {
            lock.unlock();
            result.clear(); // 清空状态为下次计算准备
        }
    }
}

代码核心解释

构造方法创建等待三个线程执行完成的 CyclicBarrier,CyclicBarrierCountDownLatch 最大的区别是 CountDownLatch 一次性的,CyclicBarrier 是可循环利用的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private final CyclicBarrier barrier = new CyclicBarrier(PARTIES, this::mergeData);

当三个线程都执行完成,会调用 mergeData 方法统计结果。

实现原理

核心数据结构
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class CyclicBarrier {
    privatefinal ReentrantLock lock = new ReentrantLock();
    privatefinal Condition trip = lock.newCondition();
    privatefinalint parties;    // 需要同步的线程数
    privatefinal Runnable barrierCommand; // 栅栏动作
    private Generation generation = new Generation(); // 当前代

    privatestaticclass Generation {
        boolean broken = false;   // 栅栏是否破裂
    }

    // 挂起线程数计数器(每次循环递减)
    privateint count;
}
CyclicBarrier 状态流转
await 实现
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        thrownew Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
     // 回环屏障使用完毕或重置后,都会生成一个新的generation,这个对象可以用来让线程退出回环屏障
final Generation g = generation;

        // 每个进入的线程,都使计数器减1,当计数器归零后进入下面的if判断
        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
             // 如果实例化时传入了Runnable对象,则在这里调用它的run()方法
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 里面做了唤醒所有等待线程的操作,线程是在下面的自旋中挂起的
                nextGeneration();
                return0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        for (;;) {
         // 此处省略的线程被interrupt的try catch
         // 根据是否传入等待时间来判断调用哪一个方法
         if (!timed) {
          // condition的await()方法,这里会暂时释放锁
             trip.await();
         } elseif (nanos > 0L) {
       nanos = trip.awaitNanos(nanos);
   }

   // 计数器归零后,线程退出自旋
   if (g != generation) {
    return index;
   }
        }
    } finally {
        lock.unlock();
    }
}

上面代码中的trip就是一个 Condition 对象,是CyclicBarrier的一个成员变量。总结一下 doWait()方法,其实做的事情还是比较简单的。

线程进入 doWait(), 先抢占到锁 lock 锁对象,并执行计数器递减 1 的操作。 递减后的计数器值不为 0,则将自己挂起在 Condition 队列中。

递减后的计数器值为 0,则调用 signalAll()唤醒所有在条件队列中的线程,并创建新的 generation 对象,让线程可以退出回环屏障。

核心方法流程图如下。

Semaphore

Semaphore,它是一个信号量,主要作用是用来控制并发中同一个时刻执行的线程数量,可以用来做限流器,或者流程控制器。

在创建的时候会指定好它有多少个信号量,比如 Semaphre semaphore = new Semaphore(2),就只有 2 个信号量。

核心功能是控制同时访问特定资源的线程数量,具有以下特性:

  • 许可管理:通过 acquire()/release() 操作许可数量
  • 公平性选择:支持公平/非公平两种模式
  • 可中断:支持带超时的许可获取
  • 动态调整:运行时修改许可数量

这个信号量可以比作是车道,每一个时刻每条车道只能允许一辆汽车通过,你可以理解为高速收费站上的收费口,每个收费口任意一时刻只能允许一辆汽车通行。

画个图来讲解一下:

如何使用

接口限流(突发流量控制)。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ApiRateLimiter {
    // 生产级配置:许可数 = QPS阈值 * 响应时间(秒)
    privatestaticfinal Semaphore SEMAPHORE = new Semaphore(500);
    privatestaticfinal Timer METRIC_TIMER = new Timer(true);

    static {
        // 监控线程:每10秒打印许可使用率
        METRIC_TIMER.schedule(new TimerTask() {
            public void run() {
                double usage = (SEMAPHORE.availablePermits() / 500.0) * 100;
                log.info("API许可使用率: {0}%", 100 - usage);
            }
        }, 10_000, 10_000);
    }

    public Response handleRequest(Request request) {
        if (!SEMAPHORE.tryAcquire(50, TimeUnit.MILLISECONDS)) { // 非阻塞获取
            thrownew BizException(429, "请求过于频繁");
        }

        try {
            return doBusinessLogic(request); // 核心业务逻辑
        } finally {
            SEMAPHORE.release(); // 确保释放许可
        }
    }
}

生产级要点

  1. 使用 tryAcquire 替代 acquire 避免线程阻塞
  2. 通过 finally 保证许可释放
  3. 集成监控上报(Prometheus + Grafana).

实现原理

Semaphore 有两种模式,公平模式和非公平模式,分别对应两个内部类为 FairSyncNonfairSync,这两个子类继承了 Sync,都是基于之前讲解过的 AQS 来实现的。

核心数据结构

画个图来说明一下内部的结构如下:

Semaphore 的公平模式依赖于 FairSync 公平同步器来实现,非公平模式依赖于 NonfairSync 非公平同步器来实现。

其中 FairSyncNonfairSync 继承自 Sync,而 Sync 又继承自 AQS,这些同步器的底层都是依赖于 AQS 提供的机制来实现的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Semaphore implements java.io.Serializable {
    privatefinal Sync sync;

    abstractstaticclass Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) { setState(permits); }
        final int getPermits() { return getState(); }
        // 非公平尝试获取许可
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        // 释放许可
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) thrownew Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    returntrue;
            }
        }
    }

    // 公平模式实现
    staticfinalclass FairSync extends Sync {
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors()) // 检查是否有等待线程
                    return -1;
                // ...与非公平模式相同
            }
        }
    }
}

所以掌握 AQS 很重要啊家人们,AQS 是模板方法模式的经典运用。

这里的 Semaphore 实现的思路跟我们之前讲过的 ReentrantLock 非常的相似,包括内部类的结构都是一样的,也是有公平和非公平两种模式。

只是不同的是 Semaphore 是共享锁,支持多个线程同时操作;然而 ReentrantLock 是互斥锁,同一个时刻只允许一个线程操作。

公平模式 acquire

公平模式,Semaphore.acquire 方法源码直接是调用 FairSyncacquireSharedInterruptibly,也就是进入了 AQS 的 acquireSharedInterruptibly 的模板方法里面了。

java.util.concurrent.Semaphore#acquire()源码如下。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

跳入 AQSacquireSharedInterruptibly 方法。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted() ||
        // Semaphore.FairSync 子类实现 tryAcquireShared
        (tryAcquireShared(arg) < 0 &&
         acquire(null, arg, true, true, false, 0L) < 0))
        throw new InterruptedException();
}

这个方法定义了一个模板流程:

  1. 先调用子类的 tryAcquireShared 方法获取共享锁,也就是获取信号量。
  2. 如果获取信号量成功,即返回值大于等于 0,则直接返回。
  3. 如果获取失败,返回值小于 0,则调用 AQS 的 doAcquireSharedInterruptibly 方法,进入 AQS 的等待队列里面,等待别人释放资源之后它再去获取。

这里我们画个图理解一下:

Semaphore.FairSync 子类实现 tryAcquireShared

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 这里作为公平模式,首先判断一下AQS等待队列里面
        // 有没有人在等待获取信号量,如果有人排队了,自己就不去获取了
        if (hasQueuedPredecessors())
            return -1;
        // 获取剩余的信号量资源
        int available = getState();
        // 剩余资源减去我需要的资源,是否小于0
        // 如果小于0则说明资源不够了
        // 如果大于等于0,说明资源是足够我使用的
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

上面的源码就是获取信号量的核心流程了:

  1. 首先判断一下 AQS 等待队列里面是否有人在排队,如果是,则自己不尝试获取资源了,乖乖的去排队
  2. 如果没有人在排队,获取一下当前剩余的信号量 available,然后减去自己需要的信号量 acquires,得到减去后的结果 remaining
  3. 如果 remaining 小于 0,直接返回 remaining,说明资源不够,获取失败了,这个时候就会进入 AQS 等待队列等待。
  4. 如果 remaining 大于等于 0,则执行 CAS 操作 compareAndSetState 竞争资源,如果成功了,说明自己获取信号量成功,如果失败了同样进入 AQS 等待队列。

我们画一下公平模式 FairSynctryAcquireShared 流程图,以及整个公平模式的 acquire 方法的流程图:

公平模式 release

看完获取,我们紧接着来看下释放,这里 Semaphorerelease 方法直接调用 SyncreleaseShared 方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  public void release() {
      sync.releaseShared(1);
  }

继续来分析 releaseShared 方法,进入到 AQS 的 releaseShard 释放资源的模板方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final boolean releaseShared(int arg) {
    // 1. 调用子类的tryReleaseShared释放资源
    if (tryReleaseShared(arg)) {
        // 释放资源成功,调用doReleaseShared唤醒等待队列中等待资源的线程
        doReleaseShared();
        return true;
    }
    return false;
}

这里的模板流程有:

  1. 调用子类的 tryReleaseShared 去释放资源,即释放信号量
  2. 如果释放成功了,则调用 doReleaseShared 唤醒 AQS 中等待资源的线程,将资源传播下去,如果释放失败,即返回小于等于 0,则直接返回。
  3. 所以,这里除了 AQS 的核心模板流程之外,具体释放逻辑就是 SynctryReleaseShared 方法的源码了,我们继续来查看:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        // 这里就是将释放的信号量资源加回去而已
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 尝试CAS设置资源,成功直接返回,失败则进入下一循环重试
        if (compareAndSetState(current, next))
            return true;
    }
}

释放资源的流程图如下:

Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger 用于进行线程间的数据交 换。

它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange()方法,它会一直等待第二个线程也 执行 exchange 方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产 出来的数据传递给对方。

使用场景

这个玩意的使用场景很少很少……大家对她有个了解即可,大可不必深入。

因为存在很多局限性。

  1. 仅限两个线程
  • 超过两个线程使用同一 Exchanger 会导致未定义行为。
  • 替代方案:使用 CyclicBarrierPhaser 实现多线程同步。
  1. 阻塞风险
  • 若一方线程未到达同步点,另一线程会永久阻塞。
  • 解决方案:使用带超时的 exchange(V x, long timeout, TimeUnit unit)
  1. 性能瓶颈
  • 频繁交换大数据对象会导致内存和 CPU 开销。
  • 优化建议:交换轻量级对象(如引用或标识符),而非完整数据。
  1. 不适用于分布式系统
  • Exchanger 仅限单 JVM 内的线程通信。
  • 替代方案消息队列(如 Kafka)或 RPC 框架(如 gRPC)。

Exchanger在多种并发编程场景中都非常有用。例如,在遗传算法中,可以使用Exchanger来实现个体之间的信息交换;在管道设计中,可以使用Exchanger来传递数据块或任务;在游戏中,可以使用Exchanger来实现玩家之间的物品交易等。

如下代码,用 Exchanger 实现两个线程将交换彼此持有的字符串数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import java.util.concurrent.Exchanger;

publicclass ExchangerExample {

    public static void main(String[] args) {
        // 创建一个Exchanger对象
        Exchanger<String> exchanger = new Exchanger<>();

        // 创建一个线程,它将使用"Hello"与另一个线程交换数据
        Thread producer = new Thread(() -> {
            try {
                String producedData = "Hello";
                String consumerData = exchanger.exchange(producedData);
                System.out.println("生产者线程交换后得到的数据: " + consumerData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "生产者线程");

        // 创建一个线程,它将使用"World"与另一个线程交换数据
        Thread consumer = new Thread(() -> {
            try {
                String consumerData = "World";
                String producedData = exchanger.exchange(consumerData);
                System.out.println("消费者线程交换后得到的数据: " + producedData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "消费者线程");

        // 启动线程
        producer.start();
        consumer.start();
    }
}

代码中我们创建了一个 Exchanger 对象,并且定义了两个线程:一个生产者线程和一个消费者线程。

生产者线程持有一个字符串 "Hello",而消费者线程持有一个字符串 "World"。两个线程都通过调用 exchanger.exchange() 方法来等待交换数据。

当两个线程都到达交换点时(即都调用了 exchange() 方法),Exchanger 会确保它们安全地交换数据。交换完成后,每个线程都会得到对方原本持有的数据,并打印出来。

实现原理

分别从数据结构和 exchange 方法来实现流程来学习实现原理。

核心数据结构

Participant 线程本地存储

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Exchanger<V> {
    // 每个线程持有一个Node
    private final Participant participant;

    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }
}


关键作用

  • 每个线程通过 Participant 持有独立的 Node 对象
  • 避免多线程竞争同一存储位置
  • 底层使用 ThreadLocal 实现线程隔离

Node 交换节点设计

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@sun.misc.Contended // 防止伪共享
static final class Node {
    int index;              // Arena下标
    int bound;              // 最近记录的前导边界
    int collides;           // CAS失败计数
    int hash;               // 伪随机自旋
    Object item;            // 携带的数据
    volatile Object match;  // 交换的数据
    volatile Thread parked; // 挂起的线程
}

内存布局优化

使用 @Contended 注解填充缓存行(64 字节)

确保不同线程访问的字段不在同一缓存行

示例内存布局:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
| 64字节缓存行 | Node.item | ...填充... |
| 64字节缓存行 | Node.match | ...填充... |

每个线程的 Node 有一个 match 属性用于存储待交换的数据。

exchange 方法执行流程

主流程源码(精简版)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public V exchange(V x) throws InterruptedException {
    Object v;
    Node[] a;
    Node q = participant.get();

    // Arena模式(
    if ((a = arena) != null ||
        (q = slotExchange(q, x, false, 0L)) == null)
        return (V)v;
    // ...省略超时处理
}

private final Object slotExchange(Node q, Object x, boolean timed, long nanos) {
    // 核心交换逻辑(
    for (;;) {
        if (slot != null) { // 存在等待节点
            Node node = (Node)slot;
            if (U.compareAndSwapObject(this, SLOT, node, null)) {
                Object v = node.item;
                node.match = x; // 数据交换(
                Thread t = node.parked;
                if (t != null)
                    U.unpark(t); // 唤醒对方线程
                return v;
            }
        } elseif (U.compareAndSwapObject(this, SLOT, null, q)) {
            // 挂起当前线程
            return timed ? awaitNanos(q, nanos) : await(q);
        }
    }
}

关键步骤解释

  1. CAS 设置槽位U.compareAndSwapObject(this, SLOT, null, q)
  2. 数据交换:直接修改对方节点的 match 字段
  3. 唤醒机制:通过 Unsafe.unpark() 解除线程阻塞
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-04-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码哥跳动 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
JUC并发—7.AQS源码分析三
volatile、synchronized、CAS、AQS、读写锁、锁优化和锁故障、并发集合、线程池、同步组件
东阳马生架构
2025/04/27
560
java架构之路(多线程)JUC并发编程之Semaphore信号量、CountDownLatch、CyclicBarrier栅栏、Executors线程池
  上次博客我们主要说了我们juc并发包下面的ReetrantLock的一些简单使用和底层的原理,是如何实现公平锁、非公平锁的。内部的双向链表到底是什么意思,prev和next到底是什么,为什么要引入heap和tail来值向null的Node节点。高并发时候是如何保证state来记录重入锁的,在我们的上次博客都做了详细的说明。这次我们来聊一些简单易懂且实用的AQS中的工具类。
小菜的不能再菜
2020/02/24
4530
Java并发:Semaphore信号量源码分析
JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢?
搜云库技术团队
2019/10/18
1K0
AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger
在编程的竞技场中,多线程与并发是每一级大牛必备的技能!今天,就让我们深入了解Java并发武器库中的“五神兵”——AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger的强大之处。他们如棋盘上的棋子,既能彼此协调,又能独当一面,解决了无数线程之问的冲突、同步与协作难题。
疯狂的KK
2024/06/04
1600
AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger
Java并发工具类(JUC)
在J.U.C包中,提供了几个非常有用的并发工具类,CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段。
chenchenchen
2022/01/05
8790
AQS-semaphore&CyclicBarrier&CountDownLatch源码学习
semaphore&cyclicbarrier&CountDownLatch的介绍
逍遥壮士
2023/02/28
2610
AQS-semaphore&CyclicBarrier&CountDownLatch源码学习
CyclicBarrier、CountDownLatch以及Semaphore使用及其原理分析
CyclicBarrier、CountDownLatch以及Semaphore是Java并发包中几个常用的并发组件,这几个组件特点是功能相识很容易混淆。首先我们分别介绍这几个组件的功能然后再通过实例分析和源码分析其中设计原理。
用户1263954
2018/07/30
5210
CyclicBarrier、CountDownLatch以及Semaphore使用及其原理分析
Java Review - 并发编程_ 信号量Semaphore原理&源码剖析
Semaphore信号量也是Java中的一个同步器,与CountDownLatch和CycleBarrier不同的是,它内部的计数器是递增的,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。
小小工匠
2021/12/30
3680
Java Review - 并发编程_ 信号量Semaphore原理&源码剖析
Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析
在分析完AbstractQueuedSynchronizer(以下简称 AQS)和ReentrantLock的原理后,本文将分析 java.util.concurrent 包下的两个线程同步组件CountDownLatch和CyclicBarrier。这两个同步组件比较常用,也经常被放在一起对比。通过分析这两个同步组件,可使我们对 Java 线程间协同有更深入的了解。同时通过分析其原理,也可使我们做到知其然,并知其所以然。
田小波
2018/05/11
2.3K14
Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析
《吃透Java》- 并发何须惧,工具来相助!
作为一名躺平的搬砖工程师,在内卷时期,慢条斯理地搬砖可能已经离你而去。砖是一种共享资源,现如今每个搬砖工都想追求质量的又要同时保持高效的搬砖速率,在争夺的情况下会不会出现并发的情况?你搬过的砖却计算在别人的KPI上,原本只想 躺平,却没想到躺平也要遭受如此不公!原本只需煎一面的咸鱼,现在还得把另一面翻过来再煎~!
蔡不菜丶
2021/07/23
2270
《吃透Java》- 并发何须惧,工具来相助!
从一道面试题进入Java并发新机制---J.U.C
看到这道题,我首先想到的是synchronized + wait/notify,具体实现为:
行百里er
2020/12/02
3050
从一道面试题进入Java并发新机制---J.U.C
面经手册 · 第18篇《AQS 共享锁,Semaphore、CountDownLatch,听说数据库连接池可以用到!》
其实并没有一天的突飞猛进,也没有一口吃出来的胖子。有得更多的时候日积月累、不断沉淀,最后才能爆发、破局!
小傅哥
2020/11/23
4090
面经手册 · 第18篇《AQS 共享锁,Semaphore、CountDownLatch,听说数据库连接池可以用到!》
JAVA面试备战(十)--Semaphore 源码分析
Semaphore(信号量)也是常用的并发工具之一,它常常用于流量控制。通常情况下,公共的资源常常是有限的,例如数据库的连接数。使用Semaphore可以帮助我们有效的管理这些有限资源的使用。
程序员爱酸奶
2022/04/12
3280
JUC 常用 4 大并发工具类 CountDownLatch、CyclicBarrier、Semaphore、ExChanger
JUC就是java.util.concurrent包,这个包俗称JUC,里面都是解决并发问题的一些定义类,该包的位置位于java下面的rt.jar包下面。
鱼找水需要时间
2023/02/16
4690
JUC 常用 4 大并发工具类 CountDownLatch、CyclicBarrier、Semaphore、ExChanger
终于有人把 CountDownLatch,CyclicBarrier,Semaphore 说明白了!
在 JUC 下包含了一些常用的同步工具类,今天就来详细介绍一下,CountDownLatch,CyclicBarrier,Semaphore 的使用方法以及它们之间的区别。
敖丙
2020/05/26
8090
终于有人把 CountDownLatch,CyclicBarrier,Semaphore 说明白了!
JAVA并发编程系列(7)Semaphore信号量剖析
其实,面对这样的面试要求,现实中的头部大厂,甚至一些普通大厂都是设计了很多编程题考查大家的基础功底。但是都不会很复杂,毕竟时间有限,往往都是经典题目,涉及一个或多个核心关键技术点。
拉丁解牛说技术
2024/09/18
1350
Java同步组件之CountDownLatch,Semaphore
Java同步组件概况 CountDownLatch : 是闭锁,通过一个计数来保证线程是否一直阻塞 Semaphore: 控制同一时间,并发线程数量 CyclicBarrier:字面意思是回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。 ReentrantLock:是一个重入锁,一个线程获得了锁之后仍然可以反复加锁,不会出现自己阻塞自己的情况。 Condition:配合ReentrantLock,实现等待/通知模型 FutureTask:FutureTask实现了接口Future,同Fu
开源日记
2021/02/05
5340
高并发之ReentrantLock、CountDownLatch、CyclicBarrier
ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁,支持重入性,表示能够对共享资源能够重复加锁,即当前线程获取该锁再次获取不会被阻塞。在java关键字synchronized隐式支持重入性(关于synchronized可以看这篇文章),synchronized通过获取自增,释放自减的方式实现重入。与此同时,ReentrantLock还支持公平锁和非公平锁两种方式。那么,要想完完全全的弄懂ReentrantLock的话,主要也就是ReentrantLock同步语义的学习:1. 重入性的实现原理;2. 公平锁和非公平锁。
用户1289394
2021/03/11
3760
高并发之ReentrantLock、CountDownLatch、CyclicBarrier
JAVA并发编程系列(8)CountDownLatch核心原理
我们利用CountDownLatch倒计时的特性,多线程并发条件下,多线程可以调用CountDownLatch.countDown()方法进行减1,然后等候信号的线程调用CountDownLatch.await()方法,等待CountDownLatch倒数为0,会被唤醒继续执行。
拉丁解牛说技术
2024/09/19
1800
CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了
并发编程的三大核心是分工,同步和互斥。在日常开发中,经常会碰到需要在主线程中开启多个子线程去并行的执行任务,并且主线程需要等待所有子线程执行完毕再进行汇总的场景,这就涉及到分工与同步的内容了
用户4172423
2020/07/03
5090
CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了
推荐阅读
相关推荐
JUC并发—7.AQS源码分析三
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档