前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >万字图解 Java 并发框架:Fork/Join、CountDownLatch、Semaphore、CyclicBarrier

万字图解 Java 并发框架:Fork/Join、CountDownLatch、Semaphore、CyclicBarrier

作者头像
码哥字节
发布于 2025-04-23 04:39:33
发布于 2025-04-23 04:39:33
41302
代码可运行
举报
文章被收录于专栏:Java 技术栈Java 技术栈
运行总次数:2
代码可运行

本文图多,内容硬核,建议收藏。

第一章节《1.6w 字图解 Java 并发:多线程挑战、线程状态和通信、死锁;AQS、ReentrantLock、Condition 使用和原理》,我们开启了 Java 高并发系列的学习,透彻理解 Java 并发编程基础,主要内容有:

  1. 多线程挑战与难点
    1. 上下文切换
    2. 死锁
    3. 资源限制的挑战
    4. 什么是线程
    5. 线程的状态
    6. 线程间的通信
  2. Java 各种各样的锁使用和原理
    1. syncconized 的使用和原理
    2. AQS 实现原理
    3. ReentrantLock 的使用和原理
    4. ReentrantReadWriteLock 读写锁使用和原理
    5. Condition 的使用和原理

第二章《1.8w 字图解 Java 并发容器: CHM、ConcurrentLinkedQueue、7 种阻塞队列的使用和原理》主要内容如下:

  • ConcurrentHashMap的使用和原理
  • ConcurrentLinkedQueue 的使用和原理
  • Java 7 种阻塞队列的使用和原理详解:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue以及LinkedBlockingDeque

第三章节围绕着 Java 并发框架展开,主要内容如下:

  • Fork/Join 框架的使用和原理
  • CountDownLatch的使用和原理
  • CyclicBarrier的使用和原理
  • Semaphore的使用和原理
  • Exchanger 的使用和原理

开搞!

Fork/Join 框架

Chaya:码哥,什么是 Fork/Join 框架?

Fork/Join 是 Java 7 引入的并行计算框架,核心思想是 **"分而治之"**。它通过以下特性解决复杂计算问题:

  • 自动任务拆分:将大任务递归拆分为子任务
  • 工作窃取算法(Work-Stealing):最大化线程利用率
  • 轻量级线程管理:基于 ForkJoinPool 的优化线程池

Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结 果。

比如计算 1+2+…+10000,可以分割成 10 个子任务,每个子任务分别对 1000 个数进行求和, 最终汇总这 10 个子任务的结果。

Fork/Join 的运行流程图如下:

工作窃取算法

Chaya:“码哥。有任务要拆分,那必然会出现分配不均匀的情况?要如何实现负载均衡呢?”

这个问题问得好,Chaya 小姐姐。

我们设计一个工作窃取算法(Work-Stealing)来解决这个问题。每个工作线程维护一个双端队列(Deque):

  • 头部:执行自己拆分出的任务(LIFO)
  • 尾部:窃取其他线程的任务(FIFO)

工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。

工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并 且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

任务拆分流程

使用场景

场景 1:大规模数据处理(并行排序)

需求:对 10 亿条数据排序,要求内存可控且充分利用多核性能。

代码实现

代码语言:javascript
代码运行次数:2
运行
AI代码解释
复制
public class ParallelMergeSort extends RecursiveAction {
    privatefinalint[] array;
    privatefinalint start;
    privatefinalint end;
    privatestaticfinalint THRESHOLD = 1_000_000; // 拆分阈值

    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            Arrays.sort(array, start, end);  // 小任务直接排序
            return;
        }

        int mid = (start + end) >>> 1;
        invokeAll(
            new ParallelMergeSort(array, start, mid),
            new ParallelMergeSort(array, mid, end)
        );

        merge(array, start, mid, end);  // 合并结果
    }

    // 生产级优化:复用临时数组减少内存分配
    private void merge(int[] array, int start, int mid, int end) {
        int[] temp = ThreadLocalRandom.current().ints().toArray();
        // ... 合并逻辑 ...
    }
}

// 使用方式
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
int[] data = loadHugeData();
pool.invoke(new ParallelMergeSort(data, 0, data.length));

性能优化点

  1. 合理设置 THRESHOLD(通过压测确定最佳值)
  2. 避免在递归中频繁创建临时数组
  3. 使用 ThreadLocalRandom 保证线程安全
场景 2:金融计算(蒙特卡洛模拟)

需求:快速计算期权定价,要求高精度且低延迟。

代码实现

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MonteCarloTask extends RecursiveTask<Double> {
    privatefinalint iterations;
    privatestaticfinalint THRESHOLD = 10_000;

    @Override
    protected Double compute() {
        if (iterations <= THRESHOLD) {
            return calculateSync(); // 同步计算
        }

        MonteCarloTask left = new MonteCarloTask(iterations / 2);
        MonteCarloTask right = new MonteCarloTask(iterations / 2);
        left.fork();

        double rightResult = right.compute();
        double leftResult = left.join(); // 注意顺序:先计算再join

        return (leftResult + rightResult) / 2;
    }

    private double calculateSync() {
        double sum = 0;
        for (int i = 0; i < iterations; i++) {
            sum += randomSimulation();
        }
        return sum / iterations;
    }
}

// 生产级调用(指定超时)
ForkJoinPool pool = new ForkJoinPool(4);
MonteCarloTask task = new MonteCarloTask(1_000_000);
pool.submit(task);

try {
    double result = task.get(5, TimeUnit.SECONDS); // 严格超时控制
} catch (TimeoutException e) {
    task.cancel(true);
    // 降级策略...
}
ForkJoinPool 生产级配置

自定义线程工厂

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class NamedForkJoinThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
    privatefinal String namePrefix;
    privatefinal AtomicInteger counter = new AtomicInteger(1);

    @Override
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) {};
        thread.setName(namePrefix + "-" + counter.getAndIncrement());
        thread.setPriority(Thread.NORM_PRIORITY);
        thread.setDaemon(false); // 生产环境必须为非守护线程
        return thread;
    }
}

与其他并发框架对比

Fork/Join 适用场景

  1. 递归可分治的问题(排序、遍历、数学计算)
  2. 严格低延迟要求的计算任务
  3. 需要自动负载均衡的大规模数据处理

CountDownLatch

CountDownLatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。

例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

假如有这样一个需求:处理 10 万条数据,分片并行处理,全部完成后触发汇总操作。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class BatchProcessor {
    privatestaticfinalint BATCH_SIZE = 1000;
    privatefinal ExecutorService executor = Executors.newFixedThreadPool(8);

    public void process(List<Data> allData) {
        int total = allData.size();
        CountDownLatch latch = new CountDownLatch(total / BATCH_SIZE);

        for (int i = 0; i < total; i += BATCH_SIZE) {
            List<Data> batch = allData.subList(i, Math.min(i+BATCH_SIZE, total));
            executor.submit(() -> {
                try {
                    processBatch(batch);
                } finally {
                    latch.countDown(); // 确保计数减少
                }
            });
        }

        try {
            if (!latch.await(5, TimeUnit.MINUTES)) {
                thrownew TimeoutException("Batch processing timeout");
            }
            generateSummaryReport();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            shutdownNow();
        }
    }

    private void processBatch(List<Data> batch) { /* ... */ }
}

CountDownLatch 的构造函数接收一个 int 类型的参数作为计数器,如果你想等待 N 个点完 成,这里就传入 N。

当我们调用 CountDownLatch 的 countDown 方法时,N 就会减 1,CountDownLatch 的 await 方法 会阻塞当前线程,直到 N 变成零。

用在多个线程时,只需要把这个 CountDownLatch 的引用传递到线程里即可。

如果有某个线程处理得比较慢,我们不可能让主线程一直等待,所以可以使 用另外一个带指定时间的 await 方法——await(long time,TimeUnit unit),这个方法等待特定时 间后,就会不再阻塞当前线程。

实现原理

CountDownLatch 的核心实现原理是基于 AQSAQS 全称 AbstractQueuedSynchronizer,是 java.util.concurrent 中提供的一种高效且可扩展的同步机制;

它是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

除了 CountDownLatch 工具类,JDK 当中的 SemaphoreReentrantLock 等工具类都是基于 AQS 来实现的。下面我们用 CountDownLatch 来分析一下 AQS 的实现。

在上一章《1.6w 字图解 Java 并发:多线程挑战、线程状态和通信、死锁;AQS、ReentrantLock、Condition 使用和原理》有详细详解 AQS。

CountDownLatch 的源码实现,发现其实它的代码实现非常简单,算上注释也才 300+ 行代码,如果去掉注释的话代码不到 100 行,大部分方法实现都是调用的 Sync 这个静态内部类的实现,而 Sync 就是继承自 AbstractQueuedSynchronizer

CountDownLatch 的 UML 类图如下:

核心代码如下。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private staticfinalclass Sync extends AbstractQueuedSynchronizer {
    Sync(int count) { setState(count); } // 初始化计数器

    // 尝试获取共享锁:当 state=0 时返回 1(成功),否则返回 -1(失败)
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    // 尝试释放共享锁:CAS 递减 state,直到变为 0
    protected boolean tryReleaseShared(int releases) {
        for (;;) { // 自旋保证原子性
            int c = getState();
            if (c == 0) returnfalse; // 已经释放完毕
            int nextc = c - 1;
            if (compareAndSetState(c, nextc)) // CAS 更新
                return nextc == 0; // 返回是否触发唤醒
        }
    }
}

Sync 重写了 AQS 中的 tryAcquireSharedtryReleaseShared 两个方法。

当调用 CountDownLatchawit() 方法时,会调用内部类 SyncacquireSharedInterruptibly() 方法,在这个方法中会调用 tryAcquireShared 方法,这个方法就是 Sync 重写的 AQS 中的方法;

调用 countDown() 方法原理基本类似。

await() 方法实现

在调用 await() 方法时,会直接调用 AQS 类的 acquireSharedInterruptibly 方法,在 acquireSharedInterruptibly 方法内部会继续调用 Sync 实现类中的 tryAcquireShared 方法,在 tryAcquireShared 方法中判断 state 变量值是否为 0

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1); // 进入 AQS 核心逻辑
}

// AQS 中的实现
public final void acquireSharedInterruptibly(int arg) {
    if (Thread.interrupted()) throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) // 检查 state 是否为 0
        doAcquireSharedInterruptibly(arg); // 进入阻塞队列
}

doAcquireSharedInterruptibly 关键步骤。

关键点

  1. 节点入队:通过 addWaiter 方法将线程封装为 SHARED 模式节点加入队列尾部
  2. 自旋检查:循环判断前驱节点是否是头节点(公平性保证)
  3. 阻塞控制:调用 LockSupport.park() 挂起线程,响应中断。
countDown() 方法

当执行 CountDownLatchcountDown() 方法,将计数器减一,也就是将 state 值减一,当减到 0 的时候,等待队列中的线程被释放。是调用 AQSreleaseShared() 方法来实现的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void countDown() {
    sync.releaseShared(1); // 触发释放操作
}

// AQS 中的实现
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // CAS 递减 state
        doReleaseShared(); // 唤醒后续节点
        return true;
    }
    return false;
}

CyclicBarrier

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。

它要做的事情是,让一 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会 开门,所有被屏障拦截的线程才会继续运行。

现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。

例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。

CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。

CyclicBarrier 是 Java 并发包中的可重用同步屏障,其特性包括:

  • 多阶段协同:支持多次 await() 的同步点
  • 栅栏动作(Barrier Action):当所有线程抵达屏障时触发
  • 自动重置:每次所有线程通过屏障后自动复位
  • 中断处理:可响应线程中断并传播异常

如何使用

构造方法
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)

解析:

  • parties:传入一个计数器值,用来配置可以阻塞多少个线程的。
  • 第二个构造方法有一个 Runnable 参数,这个对象可以在计数器值减到 0 后,发起一次调用。

例如:下面代码就会在计数器减到 0 后,打印出"回环屏障退出"。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("回环屏障退出"));

场景 :多阶段分布式计算汇总

需求:实时计算商品库存分布(本地仓 + 区域仓 + 全国仓),需三阶段统计数据汇总。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class InventoryComputeService {
// 构建 3 线程等待 CyclicBarrier
    privatefinalint PARTIES = 3;
    privatefinal CyclicBarrier barrier = new CyclicBarrier(PARTIES, this::mergeData);
    // 保存最后的结果
    privatevolatile Map<String, Integer> result = new ConcurrentHashMap<>();

    public void compute() {
       // 异步执行 3 个任务,执行完成调用 barrier.await();,当所有任务完成后会执行 mergeData
        List<CompletableFuture<Void>> tasks = new ArrayList<>();
        tasks.add(computeLocalStock());
        tasks.add(computeRegionalStock());
        tasks.add(computeNationalStock());

        // 本次3 个任务任何一个计算出现异常的话,重置 barrier
        CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))
                        .exceptionally(ex -> {
                            barrier.reset();  // 异常处理
                            returnnull;
                        });
    }

    private CompletableFuture<Void> computeLocalStock() {
      // 异步线程
        return CompletableFuture.runAsync(() -> {
            try {
                // 模拟计算耗时
                result.put("local", calculate(Region.LOCAL));
                // 执行完成,调用 await
                barrier.await(5, TimeUnit.SECONDS); // 超时控制
            } catch (Exception e) {
                handleException(e);
            }
        });
    }

    // computeRegionalStock/computeNationalStock 同理...

    private void mergeData() {
        lock.lock();
        try {
            System.out.println("各区域最终库存合并结果: " + result);
        } finally {
            lock.unlock();
            result.clear(); // 清空状态为下次计算准备
        }
    }
}

代码核心解释

构造方法创建等待三个线程执行完成的 CyclicBarrier,CyclicBarrierCountDownLatch 最大的区别是 CountDownLatch 一次性的,CyclicBarrier 是可循环利用的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private final CyclicBarrier barrier = new CyclicBarrier(PARTIES, this::mergeData);

当三个线程都执行完成,会调用 mergeData 方法统计结果。

实现原理

核心数据结构
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class CyclicBarrier {
    privatefinal ReentrantLock lock = new ReentrantLock();
    privatefinal Condition trip = lock.newCondition();
    privatefinalint parties;    // 需要同步的线程数
    privatefinal Runnable barrierCommand; // 栅栏动作
    private Generation generation = new Generation(); // 当前代

    privatestaticclass Generation {
        boolean broken = false;   // 栅栏是否破裂
    }

    // 挂起线程数计数器(每次循环递减)
    privateint count;
}
CyclicBarrier 状态流转
await 实现
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        thrownew Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
     // 回环屏障使用完毕或重置后,都会生成一个新的generation,这个对象可以用来让线程退出回环屏障
final Generation g = generation;

        // 每个进入的线程,都使计数器减1,当计数器归零后进入下面的if判断
        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
             // 如果实例化时传入了Runnable对象,则在这里调用它的run()方法
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 里面做了唤醒所有等待线程的操作,线程是在下面的自旋中挂起的
                nextGeneration();
                return0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        for (;;) {
         // 此处省略的线程被interrupt的try catch
         // 根据是否传入等待时间来判断调用哪一个方法
         if (!timed) {
          // condition的await()方法,这里会暂时释放锁
             trip.await();
         } elseif (nanos > 0L) {
       nanos = trip.awaitNanos(nanos);
   }

   // 计数器归零后,线程退出自旋
   if (g != generation) {
    return index;
   }
        }
    } finally {
        lock.unlock();
    }
}

上面代码中的trip就是一个 Condition 对象,是CyclicBarrier的一个成员变量。总结一下 doWait()方法,其实做的事情还是比较简单的。

线程进入 doWait(), 先抢占到锁 lock 锁对象,并执行计数器递减 1 的操作。 递减后的计数器值不为 0,则将自己挂起在 Condition 队列中。

递减后的计数器值为 0,则调用 signalAll()唤醒所有在条件队列中的线程,并创建新的 generation 对象,让线程可以退出回环屏障。

核心方法流程图如下。

Semaphore

Semaphore,它是一个信号量,主要作用是用来控制并发中同一个时刻执行的线程数量,可以用来做限流器,或者流程控制器。

在创建的时候会指定好它有多少个信号量,比如 Semaphre semaphore = new Semaphore(2),就只有 2 个信号量。

核心功能是控制同时访问特定资源的线程数量,具有以下特性:

  • 许可管理:通过 acquire()/release() 操作许可数量
  • 公平性选择:支持公平/非公平两种模式
  • 可中断:支持带超时的许可获取
  • 动态调整:运行时修改许可数量

这个信号量可以比作是车道,每一个时刻每条车道只能允许一辆汽车通过,你可以理解为高速收费站上的收费口,每个收费口任意一时刻只能允许一辆汽车通行。

画个图来讲解一下:

如何使用

接口限流(突发流量控制)。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ApiRateLimiter {
    // 生产级配置:许可数 = QPS阈值 * 响应时间(秒)
    privatestaticfinal Semaphore SEMAPHORE = new Semaphore(500);
    privatestaticfinal Timer METRIC_TIMER = new Timer(true);

    static {
        // 监控线程:每10秒打印许可使用率
        METRIC_TIMER.schedule(new TimerTask() {
            public void run() {
                double usage = (SEMAPHORE.availablePermits() / 500.0) * 100;
                log.info("API许可使用率: {0}%", 100 - usage);
            }
        }, 10_000, 10_000);
    }

    public Response handleRequest(Request request) {
        if (!SEMAPHORE.tryAcquire(50, TimeUnit.MILLISECONDS)) { // 非阻塞获取
            thrownew BizException(429, "请求过于频繁");
        }

        try {
            return doBusinessLogic(request); // 核心业务逻辑
        } finally {
            SEMAPHORE.release(); // 确保释放许可
        }
    }
}

生产级要点

  1. 使用 tryAcquire 替代 acquire 避免线程阻塞
  2. 通过 finally 保证许可释放
  3. 集成监控上报(Prometheus + Grafana).

实现原理

Semaphore 有两种模式,公平模式和非公平模式,分别对应两个内部类为 FairSyncNonfairSync,这两个子类继承了 Sync,都是基于之前讲解过的 AQS 来实现的。

核心数据结构

画个图来说明一下内部的结构如下:

Semaphore 的公平模式依赖于 FairSync 公平同步器来实现,非公平模式依赖于 NonfairSync 非公平同步器来实现。

其中 FairSyncNonfairSync 继承自 Sync,而 Sync 又继承自 AQS,这些同步器的底层都是依赖于 AQS 提供的机制来实现的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Semaphore implements java.io.Serializable {
    privatefinal Sync sync;

    abstractstaticclass Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) { setState(permits); }
        final int getPermits() { return getState(); }
        // 非公平尝试获取许可
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        // 释放许可
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) thrownew Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    returntrue;
            }
        }
    }

    // 公平模式实现
    staticfinalclass FairSync extends Sync {
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors()) // 检查是否有等待线程
                    return -1;
                // ...与非公平模式相同
            }
        }
    }
}

所以掌握 AQS 很重要啊家人们,AQS 是模板方法模式的经典运用。

这里的 Semaphore 实现的思路跟我们之前讲过的 ReentrantLock 非常的相似,包括内部类的结构都是一样的,也是有公平和非公平两种模式。

只是不同的是 Semaphore 是共享锁,支持多个线程同时操作;然而 ReentrantLock 是互斥锁,同一个时刻只允许一个线程操作。

公平模式 acquire

公平模式,Semaphore.acquire 方法源码直接是调用 FairSyncacquireSharedInterruptibly,也就是进入了 AQS 的 acquireSharedInterruptibly 的模板方法里面了。

java.util.concurrent.Semaphore#acquire()源码如下。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

跳入 AQSacquireSharedInterruptibly 方法。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted() ||
        // Semaphore.FairSync 子类实现 tryAcquireShared
        (tryAcquireShared(arg) < 0 &&
         acquire(null, arg, true, true, false, 0L) < 0))
        throw new InterruptedException();
}

这个方法定义了一个模板流程:

  1. 先调用子类的 tryAcquireShared 方法获取共享锁,也就是获取信号量。
  2. 如果获取信号量成功,即返回值大于等于 0,则直接返回。
  3. 如果获取失败,返回值小于 0,则调用 AQS 的 doAcquireSharedInterruptibly 方法,进入 AQS 的等待队列里面,等待别人释放资源之后它再去获取。

这里我们画个图理解一下:

Semaphore.FairSync 子类实现 tryAcquireShared

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 这里作为公平模式,首先判断一下AQS等待队列里面
        // 有没有人在等待获取信号量,如果有人排队了,自己就不去获取了
        if (hasQueuedPredecessors())
            return -1;
        // 获取剩余的信号量资源
        int available = getState();
        // 剩余资源减去我需要的资源,是否小于0
        // 如果小于0则说明资源不够了
        // 如果大于等于0,说明资源是足够我使用的
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

上面的源码就是获取信号量的核心流程了:

  1. 首先判断一下 AQS 等待队列里面是否有人在排队,如果是,则自己不尝试获取资源了,乖乖的去排队
  2. 如果没有人在排队,获取一下当前剩余的信号量 available,然后减去自己需要的信号量 acquires,得到减去后的结果 remaining
  3. 如果 remaining 小于 0,直接返回 remaining,说明资源不够,获取失败了,这个时候就会进入 AQS 等待队列等待。
  4. 如果 remaining 大于等于 0,则执行 CAS 操作 compareAndSetState 竞争资源,如果成功了,说明自己获取信号量成功,如果失败了同样进入 AQS 等待队列。

我们画一下公平模式 FairSynctryAcquireShared 流程图,以及整个公平模式的 acquire 方法的流程图:

公平模式 release

看完获取,我们紧接着来看下释放,这里 Semaphorerelease 方法直接调用 SyncreleaseShared 方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  public void release() {
      sync.releaseShared(1);
  }

继续来分析 releaseShared 方法,进入到 AQS 的 releaseShard 释放资源的模板方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final boolean releaseShared(int arg) {
    // 1. 调用子类的tryReleaseShared释放资源
    if (tryReleaseShared(arg)) {
        // 释放资源成功,调用doReleaseShared唤醒等待队列中等待资源的线程
        doReleaseShared();
        return true;
    }
    return false;
}

这里的模板流程有:

  1. 调用子类的 tryReleaseShared 去释放资源,即释放信号量
  2. 如果释放成功了,则调用 doReleaseShared 唤醒 AQS 中等待资源的线程,将资源传播下去,如果释放失败,即返回小于等于 0,则直接返回。
  3. 所以,这里除了 AQS 的核心模板流程之外,具体释放逻辑就是 SynctryReleaseShared 方法的源码了,我们继续来查看:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        // 这里就是将释放的信号量资源加回去而已
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 尝试CAS设置资源,成功直接返回,失败则进入下一循环重试
        if (compareAndSetState(current, next))
            return true;
    }
}

释放资源的流程图如下:

Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger 用于进行线程间的数据交 换。

它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange()方法,它会一直等待第二个线程也 执行 exchange 方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产 出来的数据传递给对方。

使用场景

这个玩意的使用场景很少很少……大家对她有个了解即可,大可不必深入。

因为存在很多局限性。

  1. 仅限两个线程
  • 超过两个线程使用同一 Exchanger 会导致未定义行为。
  • 替代方案:使用 CyclicBarrierPhaser 实现多线程同步。
  1. 阻塞风险
  • 若一方线程未到达同步点,另一线程会永久阻塞。
  • 解决方案:使用带超时的 exchange(V x, long timeout, TimeUnit unit)
  1. 性能瓶颈
  • 频繁交换大数据对象会导致内存和 CPU 开销。
  • 优化建议:交换轻量级对象(如引用或标识符),而非完整数据。
  1. 不适用于分布式系统
  • Exchanger 仅限单 JVM 内的线程通信。
  • 替代方案消息队列(如 Kafka)或 RPC 框架(如 gRPC)。

Exchanger在多种并发编程场景中都非常有用。例如,在遗传算法中,可以使用Exchanger来实现个体之间的信息交换;在管道设计中,可以使用Exchanger来传递数据块或任务;在游戏中,可以使用Exchanger来实现玩家之间的物品交易等。

如下代码,用 Exchanger 实现两个线程将交换彼此持有的字符串数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import java.util.concurrent.Exchanger;

publicclass ExchangerExample {

    public static void main(String[] args) {
        // 创建一个Exchanger对象
        Exchanger<String> exchanger = new Exchanger<>();

        // 创建一个线程,它将使用"Hello"与另一个线程交换数据
        Thread producer = new Thread(() -> {
            try {
                String producedData = "Hello";
                String consumerData = exchanger.exchange(producedData);
                System.out.println("生产者线程交换后得到的数据: " + consumerData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "生产者线程");

        // 创建一个线程,它将使用"World"与另一个线程交换数据
        Thread consumer = new Thread(() -> {
            try {
                String consumerData = "World";
                String producedData = exchanger.exchange(consumerData);
                System.out.println("消费者线程交换后得到的数据: " + producedData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "消费者线程");

        // 启动线程
        producer.start();
        consumer.start();
    }
}

代码中我们创建了一个 Exchanger 对象,并且定义了两个线程:一个生产者线程和一个消费者线程。

生产者线程持有一个字符串 "Hello",而消费者线程持有一个字符串 "World"。两个线程都通过调用 exchanger.exchange() 方法来等待交换数据。

当两个线程都到达交换点时(即都调用了 exchange() 方法),Exchanger 会确保它们安全地交换数据。交换完成后,每个线程都会得到对方原本持有的数据,并打印出来。

实现原理

分别从数据结构和 exchange 方法来实现流程来学习实现原理。

核心数据结构

Participant 线程本地存储

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Exchanger<V> {
    // 每个线程持有一个Node
    private final Participant participant;

    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }
}


关键作用

  • 每个线程通过 Participant 持有独立的 Node 对象
  • 避免多线程竞争同一存储位置
  • 底层使用 ThreadLocal 实现线程隔离

Node 交换节点设计

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@sun.misc.Contended // 防止伪共享
static final class Node {
    int index;              // Arena下标
    int bound;              // 最近记录的前导边界
    int collides;           // CAS失败计数
    int hash;               // 伪随机自旋
    Object item;            // 携带的数据
    volatile Object match;  // 交换的数据
    volatile Thread parked; // 挂起的线程
}

内存布局优化

使用 @Contended 注解填充缓存行(64 字节)

确保不同线程访问的字段不在同一缓存行

示例内存布局:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
| 64字节缓存行 | Node.item | ...填充... |
| 64字节缓存行 | Node.match | ...填充... |

每个线程的 Node 有一个 match 属性用于存储待交换的数据。

exchange 方法执行流程

主流程源码(精简版)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public V exchange(V x) throws InterruptedException {
    Object v;
    Node[] a;
    Node q = participant.get();

    // Arena模式(
    if ((a = arena) != null ||
        (q = slotExchange(q, x, false, 0L)) == null)
        return (V)v;
    // ...省略超时处理
}

private final Object slotExchange(Node q, Object x, boolean timed, long nanos) {
    // 核心交换逻辑(
    for (;;) {
        if (slot != null) { // 存在等待节点
            Node node = (Node)slot;
            if (U.compareAndSwapObject(this, SLOT, node, null)) {
                Object v = node.item;
                node.match = x; // 数据交换(
                Thread t = node.parked;
                if (t != null)
                    U.unpark(t); // 唤醒对方线程
                return v;
            }
        } elseif (U.compareAndSwapObject(this, SLOT, null, q)) {
            // 挂起当前线程
            return timed ? awaitNanos(q, nanos) : await(q);
        }
    }
}

关键步骤解释

  1. CAS 设置槽位U.compareAndSwapObject(this, SLOT, null, q)
  2. 数据交换:直接修改对方节点的 match 字段
  3. 唤醒机制:通过 Unsafe.unpark() 解除线程阻塞
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-04-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码哥跳动 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
TikTok的算法为何如此有效?
抖音的推荐系统非常擅长理解用户需求——好到让埃隆·马斯克这样的科技巨头都羡慕不已。但抖音的成功秘诀是什么呢?
云云众生s
2025/01/26
2230
TikTok的算法为何如此有效?
前亚马逊产品经理:TikTok的真正优势,并不是算法
在前段时间的风波中,TikTok 的推荐算法一度成为大家争相讨论的话题中心。但在前亚马逊产品经理(同时也是亚马逊战略规划部第一位分析师)Eugene Wei 看来,TikTok 的算法本身并没有特别的突破性创意,TikTok 产品真正的价值点在于 TikTok 的设计和流程里面的每一个元素是怎么互相关联到一起,从而创建出一个数据集,再通过这个数据集,把算法训练成最佳性能的。
程序员小猿
2021/01/20
8680
前亚马逊产品经理:TikTok的真正优势,并不是算法
一文全面了解基于内容的推荐算法
这篇文章我们主要关注的是基于内容的推荐算法,它也是非常通用的一类推荐算法,在工业界有大量的应用案例。
AI科技大本营
2019/07/12
5.1K0
一文全面了解基于内容的推荐算法
5类系统推荐算法,告诉你用户需要什么
序言 最近因为PAC平台自动化的需求,开始探坑推荐系统。这个乍一听去乐趣无穷的课题,对于算法大神们来说是这样的: 而对于刚接触这个领域的我来说,是这样的: 在深坑外围徘徊了一周后,我整理了一些推荐系统
大数据文摘
2018/05/22
1.3K0
胡泳:在“推荐就是一切”的时代
“决定你命运的不是机遇,而是选择”,这句格言常被归于琴·尼德奇(Jean Nidetch),堪称亚里士多德伦理学的一种总结(网上有大量的误传,真的相信其为亚里士多德的金句)。尼德奇1963年创立综合性减肥方案和饮食计划Weight Watchers,旨在帮助人们通过健康的饮食和生活方式的改变来减轻体重。她是今天我们所熟悉的网上“健康达人”的真正鼻祖,一位来自皇后区的家庭主妇在减肥过程中,凭借雄心壮志,找到了一个出口、一种身份和一条自我实现的路径。
小腾资讯君
2025/05/09
1020
初识推荐算法
推荐系统是根据用户的浏览习惯,确定用户的兴趣,通过发掘用户的行为,将合适的信息推荐给用户,满足用户的个性化需求,帮助用户找到对他胃口但是不易找到的信息或商品。
鳄鱼儿
2024/05/21
1390
初识推荐算法
美国电商平台的个性化推荐算法实践及优化思路
本文介绍了手工艺品电商平台Etsy的个性化推荐算法实践及优化思路,计算过程分为基于历史数据建模和计算推荐结果两个阶段,采用的手段主要包括矩阵分解、交替最小二乘、随机SVD(奇异值分解)和局部敏感哈希等。 提供个性化推荐对网上购物市场非常重要。个性化推荐对买卖双方都是有利的:购买者不用自己去搜索就可以直接获得他们感兴趣的产品信息,卖家则可以以较小的市场营销代价获得更好的产品曝光度。在这篇文章中,我们将介绍我们在Esty(美国网络商店平台,以手工艺成品买卖为主要特色——译者注)中使用的一些推荐
机器学习AI算法工程
2018/03/12
1.5K0
美国电商平台的个性化推荐算法实践及优化思路
基于标签的实时短视频推荐系统 | 深度
导语:作者在《基于内容的推荐算法》这篇文章中对基于内容的推荐算法做了比较详细的讲解,其中一类非常重要的内容推荐算法是基于标签的倒排索引算法,也是工业界用的比较多的算法,特别是新闻资讯类、短视频类产品大量采用该类算法。在本篇文章中作者会结合电视猫的业务场景及工程实践经验来详细讲解基于标签的倒排索引算法的原理及工程落地方案细节。
AI科技大本营
2019/07/30
3.3K0
基于标签的实时短视频推荐系统 | 深度
万字长文带你了解推荐系统全貌!
如果说互联网的目标就是连接一切,那么推荐系统的作用就是建立更加有效率的连接,推荐系统可以更有效率的连接用户与内容和服务,节约了大量的时间和成本。
Datawhale
2020/10/27
7640
万字长文带你了解推荐系统全貌!
Mozilla怒喷当前推荐系统技术:算法“陈旧弱智”,效果非常糟糕!
编译 | 核子可乐、Tina Mozilla 喷当前视频平台引领者所使用的推荐系统技术:使用的算法“陈旧弱智”,效果非常“糟糕”,堪称“恐怖秀”。 根据 Mozilla 本周三发布的调查研究结果表明,大部分饱受用户们吐槽的 YouTube 视频推荐内容都出自该网站陈旧的 AI 算法之手。 该调查研究从去年 9 月开始启动,总共涉及到 37380 名 YouTube 观众。根据 Mozilla 的报告,这是同类研究中规模最大的一次,而且显示出来的结果只是“冰山的一角”,其中每项发现都值得进一步跟踪并做出深刻剖
深度学习与Python
2023/04/01
2790
Mozilla怒喷当前推荐系统技术:算法“陈旧弱智”,效果非常糟糕!
个性化推荐算法总结[通俗易懂]
并且,推荐系统能够很好的发掘物品的长尾,挑战传统的2/8原则(80%的销售额来自20%的热门品牌)。
全栈程序员站长
2022/08/15
2.1K0
个性化推荐算法总结[通俗易懂]
达观数据是如何基于用户历史行为进行精准个性化推荐的?
个性化推荐系统实践 达观数据于敬 在DT(data technology)时代,网上购物、观看视频、聆听音乐、阅读新闻等各个领域无不充斥着各种推荐,个性化推荐已经完全融入人们的日常生活当中。个性化推荐根据用户的历史行为数据进行深层兴趣点挖掘,将用户最感兴趣的物品推荐给用户,从而做到千人千面,不仅满足了用户本质的信息诉求,也最大化了企业的自身利益,所以个性化推荐蕴含着无限商机。 号称“推荐系统之王”的电子商务网站亚马逊曾宣称,亚马逊有20%~30%的销售来自于推荐系统。其最大优势就在于个性化推荐系统,该系统让
达观数据
2018/03/30
1.6K0
为什么刷短视频上瘾?周受资来解密了
“算法推荐里不存在任何强迫性质的规则,作品能让人产生共鸣,它自然会走红。”TikTok首席执行官周受资在接受专访时说道。
用户9861443
2023/11/27
2950
为什么刷短视频上瘾?周受资来解密了
快点进来get“推荐系统常用的推荐算法”
一、推荐系统概述和常用评价指标 1.1 推荐系统的特点 在知乎搜了一下推荐系统,果真结果比较少,显得小众一些,然后大家对推荐系统普遍的观点是: (1)重要性UI>数据>算法,就是推荐系统中一味追求先
小莹莹
2018/04/25
1.2K0
快点进来get“推荐系统常用的推荐算法”
小程序的智能推荐与大数据应用
智能推荐 是利用算法和模型,通过分析用户的历史行为、兴趣偏好等数据,预测并推荐最符合用户需求的内容或商品。在小程序中,智能推荐常见的应用场景包括商品推荐、内容推荐、社交推荐等。
LucianaiB
2025/02/10
2680
短视频到底如何推荐的?深度剖析视频算法推送原理详细且专业的解读-优雅草卓伊凡-【01】短视频算法推荐之数据收集
在当今数字化时代,视频算法推送系统作为各类视频平台的核心技术,其数据收集环节是实现精准个性化推荐的基础。数据收集主要涵盖用户行为数据采集和内容数据解析两个方面,下面进行详细阐述。
卓伊凡
2025/03/16
2660
推荐系统基础:算法与应用
推荐系统是一种利用算法和数据分析技术为用户提供个性化推荐的技术。它在电子商务、社交媒体、内容提供等领域发挥着重要作用。本文将详细介绍推荐系统的基础知识,包括常见的算法及其应用,并通过一个完整的项目展示推荐系统的部署过程。
数字扫地僧
2024/08/06
2200
技术经理眼中的从零搭建推荐体系—全链路
随着信息技术的迅速发展和信息内容的日益增长,“信息过载”问题愈来愈严重,愈发带来很大的信息负担。推荐系统可以有效缓解此难题,从而得到推崇并加以广泛应用。 简单来说:推荐系统是通过挖掘用户与项目之间的二元关系,帮助用户从大量数据中发现其可能感兴趣的项目如网页、服务、商品、人等,并生成个性化推荐以满足个性化需求。目前市场上对于电子商务的推荐系统有亚马逊、阿里巴巴、豆瓣网、当当网等,信息检索的有谷歌、雅虎、百度等,以及在其它周边领域广泛运用如移动应用、电子旅游、互联网广告等。本文只阐述网页内容,特制新闻方面的项目体系搭建。
guichen1013
2022/09/22
4600
技术经理眼中的从零搭建推荐体系—全链路
推荐系统产品与算法概述 | 深度
作者在《推荐系统的工程实现》(点击蓝字可回顾)这篇文章的第五部分“推荐系统范式”中讲到工业级推荐系统有非个性化范式、完全个性化范式、群组个性化范式、标的物关联标的物范式、笛卡尔积范式等 5种 常用的推荐范式。本文会按照这5大范式来讲解常用的推荐算法,但不会深入讲解算法的实现原理,只是概述算法的实现思路,后面的系列文章我会对常用的重点算法进行细致深入剖析。
AI科技大本营
2019/06/20
1.7K0
推荐系统产品与算法概述 | 深度
响铃:抖音的算法,美拍的社区,谁能赢得短视频内容升级战?
2018年开年至今,短视频们依然酣战。从西瓜视频“百万英雄”疯狂撒币,到抖音在春节期间用户暴涨,再到近期监管整改问题爆出,战况可谓跌宕起伏。
曾响铃
2018/08/21
3430
响铃:抖音的算法,美拍的社区,谁能赢得短视频内容升级战?
推荐阅读
相关推荐
TikTok的算法为何如此有效?
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档