前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger

AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger

原创
作者头像
疯狂的KK
修改2024-06-05 17:00:15
820
修改2024-06-05 17:00:15
举报
文章被收录于专栏:Java项目实战Java项目实战
开篇

在编程的竞技场中,多线程与并发是每一级大牛必备的技能!今天,就让我们深入了解Java并发武器库中的“五神兵”——AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger的强大之处。他们如棋盘上的棋子,既能彼此协调,又能独当一面,解决了无数线程之问的冲突、同步与协作难题。

::: tip 走过路过,不要错过!

点赞、评论、转发是对笔者最大的支持!是不是已经迫不及待想要揭开这五神兵的神秘面纱?阅读完本文后,学会即点即用,咱也能成为一名真正的高级并发架构师!🚀

:::


AQS — 顶级武将,无懈可击!(AbstractQueuedSynchronizer)

简介与原理

AQS是Java并发库的基础,很多Java并发类都以其为基石来构建更为复杂的同步机制。它提供了一种框架,使得线程同步更加高效。

AQS通过一个内部队列及一个state变量来管理同步状态,确保只有一个线程获取到锁。

实际应用场景
  • 队列同步器ReentrantLock
  • 信号量Semaphore
  • 闭锁CountDownLatch
  • 栅栏CyclicBarrier
  • 读写锁ReentrantReadWriteLock
代码示例

以下是一个自定义的同步组件示例,实现了独占和共享锁功能:

代码语言:java
复制
class CustomMutex {
    // 使用AQS实现互斥锁功能
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {}

    private static final class NonfairSync extends Sync {
        NonfairSync() {
            setState(0); // 初始状态为0,可使用
        }

        // 尝试获取锁
        @Override
        protected boolean tryAcquire(int acquires) {
            return compareAndSetState(0, 1);
        }

        // 释放锁
        @Override
        protected boolean tryRelease(int releases) {
            setState(0);
            return true;
        }
    }

    public CustomMutex() {
        sync = new NonfairSync();
    }

    // 加锁
    public void lock() {
        sync.acquire(1);
    }

    // 解锁
    public void unlock() {
        sync.release(1);
    }
}

CountDownLatch — 开门神兵,开启世界!

简介与原理

CountDownLatch是AQS的实现之一,主要用于某一线程等待其他线程完成操作后才能继续执行,它允许一个或多个线程等待直到在其他线程中发生某些操作。

实际应用场景

常用于主线程必须等待其他所有线程完成某项工作后才能继续执行的场景。

代码示例
代码语言:java
复制
public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threads = 5;
        CountDownLatch latch = new CountDownLatch(threads);

        for (int i = 0; i < threads; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " is doing some task");
                // 模拟工作时间
                try {
                    Thread.sleep((long) (Math.random() * 1000));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println(Thread.currentThread().getName() + " has finished its task");
                // 减少计数器
                latch.countDown();
            }).start();
        }

        // 主线程等待其他线程完成
        latch.await();
        System.out.println("All workers have finished their tasks.");
    }
}

CyclicBarrier — 天罗地网,协同作战!

简介与原理

CyclicBarrier用于线程同步,允许多个线程互相等待到达屏障点。

当指定数目的线程都调用了await方法后,这些线程才会继续执行。

实际应用场景

如在进行大规模并行处理时,可以使用CyclicBarrier让多个线程在某个点集结,共同执行任务。

代码示例
代码语言:java
复制
public class CyclicBarrierExample {
    public static void main(String[] args) {
        int numThreads = 5;
        CyclicBarrier barrier = new CyclicBarrier(numThreads);

        for (int i = 0; i < numThreads; i++) {
            final int threadNum = i;
            new Thread(() -> {
                System.out.println("Thread " + threadNum + " reaching barrier.");
                try {
                    barrier.await();  // 等待所有线程到达屏障点
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore — 交通信号灯,有序通行!

简介与原理

Semaphore可以控制访问特定资源的线程数量。它通过设置许可数控制并发数量。

实际应用场景

适用于控制同时访问某个特定资源的线程数量,如数据库连接池,或限流功能。

代码示例
代码语言:java
复制
public class SemaphoreExample {
    public static void main(String[] args) {
        final int permits = 3;  // 信号量的初始许可数量
        Semaphore semaphore = new Semaphore(permits);

        for (int i = 0; i < 5; i++) {
            new Thread(new Worker(i, semaphore)).start();
        }
    }

    static class Worker implements Runnable {
        private final int workerId;
        private final Semaphore semaphore;

        Worker(int id, Semaphore semaphore) {
            this.workerId = id;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("Worker " + workerId + " got a permit.");
                // 模拟工作耗时
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
                System.out.println("Worker " + workerId + " released the permit.");
            }
        }
    }
}

Exchanger — 交流使者,双向通道!

简介与原理

Exchanger是一个用于两个线程之间交换数据的工具,通常用于生产者消费者模式。

实际应用场景

适合于需要两个线程在不同的点互相交换数据的场景,比如在网格计算、数据分发等情形。

代码示例
代码语言:java
复制
public class ExchangerExample {
    static Exchanger<Integer> exchanger = new Exchanger<>();

    public static void main(String[] args) throws InterruptedException {
        Thread consumer = new Thread(() -> {
            try {
                System.out.println("Consumer received: " + exchanger.exchange(1));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        consumer.start();

        System.out.println("Producer sends: " + exchanger.exchange(2));
    }
}

一、AQS(AbstractQueuedSynchronizer)

1.1 概述

AQS 是 Java 并发包中一个非常重要的抽象类,用于构建锁和同步器。多数同步类(如 ReentrantLock、CountDownLatch 等)都是基于 AQS 实现的。

1.2 底层原理

AQS 通过一个 FIFO 队列来实现资源的争夺。它使用一个 state 变量来表示同步状态,通过 CAS 操作进行原子更新。

  • 状态为 0 表示无锁,1 表示有锁。
  • 线程通过 acquire(int arg)release(int arg) 方法来申请和释放资源。

1.3 应用场景

AQS 主要用于构建锁和其他同步器,比如:

  • ReentrantLock
  • CountDownLatch
  • Semaphore
  • ReadWriteLock

1.4 实践代码

代码语言:java
复制
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class SimpleLock {
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }
}

二、CountDownLatch

2.1 概述

CountDownLatch 是一个同步工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

2.2 底层原理

CountDownLatch 通过 AQS 的 state 来计数,调用 countDown() 方法会将 state 减 1,当 state 减到 0 时,所有等待线程会被唤醒。

2.3 应用场景

CountDownLatch 适用于:

  • 多线程并行计算,等待所有计算线程完成后再进行汇总。
  • 控制某个线程在多个线程完成初始化工作后再开始工作。

2.4 实践代码

代码语言:java
复制
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        final int threadCount = 3;
        final CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " is running");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            }).start();
        }

        latch.await();
        System.out.println("All threads have finished.");
    }
}

三、CyclicBarrier

3.1 概述

CyclicBarrier 是一个同步工具类,它允许一组线程互相等待,直到所有线程都到达某个屏障点。

3.2 底层原理

CyclicBarrier 使用一个计数器来跟踪到达屏障的线程数量。当计数器达到预设值时,所有等待的线程会被唤醒,并且计数器被重置。

3.3 应用场景

CyclicBarrier 适用于:

  • 多个线程并行计算,所有线程到达某个阶段后再继续执行。
  • 多线程游戏中,各个玩家需要在同一时间点开始游戏。

3.4 实践代码

代码语言:java
复制
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    private static final int THREAD_COUNT = 3;
    private static final CyclicBarrier BARRIER = new CyclicBarrier(THREAD_COUNT, () -> {
        System.out.println("All threads have reached the barrier. Let's proceed.");
    });

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(new Task()).start();
        }
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " is performing some work.");
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(Thread.currentThread().getName() + " has reached the barrier.");
                BARRIER.await();
                System.out.println(Thread.currentThread().getName() + " is proceeding with further work.");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

四、Semaphore

4.1 概述

Semaphore 是一个计数信号量,用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。

4.2 底层原理

Semaphore 通过内部的计数器来控制资源的访问数量。调用 acquire() 方法会尝试获取一个许可,计数器减 1;调用 release() 方法会释放一个许可,计数器加 1。

4.3 应用场景

Semaphore 适用于:

  • 限制对某些资源的并发访问数量,如数据库连接池。
  • 控制同时操作的线程数量,防止资源过度占用。

4.4 实践代码

代码语言:java
复制
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    private static final int PERMITS = 3;
    private static final Semaphore SEMAPHORE = new Semaphore(PERMITS);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(new Task()).start();
        }
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                SEMAPHORE.acquire();
                System.out.println(Thread.currentThread().getName() + " acquired a permit.");
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(Thread.currentThread().getName() + " released a permit.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                SEMAPHORE.release();
            }
        }
    }
}

五、Exchanger

5.1 概述

Exchanger 是一个用于线程间数据交换的同步点。两个线程可以在此交换数据,每个线程通过 exchange 方法交换彼此的数据。

5.2 底层原理

Exchanger 通过一个内部的交换槽(exchange slot)来实现数据交换。当一个线程调用 exchange() 方法时,会等待另一个线程也调用 exchange() 方法,然后互相交换数据。

5.3 应用场景

Exchanger 适用于:

  • 两个线程需要交换数据的场景,如生产者和消费者模型中的缓冲区交换。
  • 双方均需要对方提供的数据来继续执行的场景。

5.4 实践代码

代码语言:java
复制
import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    private static final Exchanger<String> EXCHANGER = new Exchanger<>();

    public static void main(String[] args) {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }

    static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                String data = "Data from Producer";
                System.out.println("Producer is exchanging data: " + data);
                String receivedData = EXCHANGER.exchange(data);
                System.out.println("Producer received data: " + receivedData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                String data = "Data from Consumer";
                System.out.println("Consumer is exchanging data: " + data);
                String receivedData = EXCHANGER.exchange(data);
                System.out.println("Consumer received data: " + receivedData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

六、深入理解和对比

6.1 AQS(AbstractQueuedSynchronizer)

AQS 是 Java 并发包的基石,它为实现锁和同步器提供了一个框架。AQS 通过维护一个 FIFO 队列来管理线程的等待状态,并通过 state 变量控制同步状态。

优点
  • 为各种同步器提供了统一的实现基础。
  • 通过 CAS(Compare-And-Swap)操作保证了高效和线程安全的状态更新。
缺点
  • 实现复杂,需要深入理解其机制才能正确使用。

6.2 CountDownLatch

CountDownLatch 是一个简单而强大的等待机制,适用于一组线程等待另一个线程完成工作。

优点
  • 使用简单,适合线程间的等待和通知机制。
  • 没有重复使用的能力(计数器只能减到 0)。
缺点
  • 只能使用一次,不能重置。

6.3 CyclicBarrier

CyclicBarrier 允许一组线程互相等待,直到所有线程都到达某个屏障点。它可以被重复使用。

优点
  • 可以重用,适合多次使用的同步场景。
  • 方便的回调机制,当所有线程到达屏障点时执行。
缺点
  • 实现相对复杂,需要处理 BrokenBarrierException 等异常。

6.4 Semaphore

Semaphore 控制同时访问某资源的线程数量,可以用于实现资源池等场景。

优点
  • 灵活的许可机制,可控制并发线程数。
  • 支持公平和非公平模式。
缺点
  • 需要手动管理许可的获取和释放,容易出现死锁或许可泄漏。

6.5 Exchanger

Exchanger 用于两个线程之间的数据交换。

优点
  • 简化了两个线程之间的数据交换过程。
  • 使用方便,避免了复杂的同步和等待机制。
缺点
  • 仅适用于两个线程之间的数据交换。

6.6 实践对比与总结

在实际项目中,我们需要根据不同的需求选择合适的同步工具。下面总结了这些工具的适用场景:

工具

适用场景

AQS

构建自定义同步器、锁等复杂同步机制。

CountDownLatch

一个或多个线程等待其他线程完成工作,适用于一次性任务。

CyclicBarrier

一组线程需要在某个点上相互等待,适用于循环使用的场景。

Semaphore

控制资源的并发访问数量,如连接池、限流等。

Exchanger

两个线程之间的数据交换,如数据传递、缓冲区交换等。

七、项目实战:模拟高并发场景

在实际项目中,我们可能会遇到需要控制并发访问、协调线程操作的复杂场景。下面我们用一个模拟高并发访问资源的例子来演示如何综合运用这些同步工具。

7.1 场景描述

假设我们有一个高并发访问的资源(如数据库连接),我们需要:

  • 控制同时访问资源的线程数量(使用 Semaphore)。
  • 等待所有线程完成初始化工作后再开始访问资源(使用 CountDownLatch)。
  • 各个线程在某个阶段需要同步(使用 CyclicBarrier)。

7.2 实践代码(续)

代码语言:java
复制
import java.util.concurrent.*;

public class HighConcurrencyDemo {
    private static final int THREAD_COUNT = 10;
    private static final int PERMITS = 3;
    private static final CountDownLatch INIT_LATCH = new CountDownLatch(THREAD_COUNT);
    private static final CyclicBarrier BARRIER = new CyclicBarrier(THREAD_COUNT);
    private static final Semaphore SEMAPHORE = new Semaphore(PERMITS);

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.submit(new Worker());
        }

        executor.shutdown();
    }

    static class Worker implements Runnable {
        @Override
        public void run() {
            try {
                // 模拟初始化工作
                System.out.println(Thread.currentThread().getName() + " is initializing.");
                Thread.sleep((long) (Math.random() * 1000));
                INIT_LATCH.countDown();
                INIT_LATCH.await();

                // 等待其他线程到达屏障
                System.out.println(Thread.currentThread().getName() + " is waiting at the barrier.");
                BARRIER.await();

                // 控制访问资源的并发数量
                SEMAPHORE.acquire();
                System.out.println(Thread.currentThread().getName() + " is accessing the resource.");
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(Thread.currentThread().getName() + " has finished accessing the resource.");
                SEMAPHORE.release();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

7.3 代码分析

  1. 初始化工作(CountDownLatch)
    • 每个线程在开始工作前模拟初始化过程,初始化完成后调用 countDown() 方法。
    • 主线程或者其他线程通过 await() 方法等待所有初始化工作完成。
  2. 同步屏障(CyclicBarrier)
    • 所有线程在某个阶段需要同步,调用 await() 方法等待其他线程到达屏障点。
    • 当所有线程到达屏障点后,继续执行后续工作。
  3. 并发控制(Semaphore)
    • 使用 Semaphore 来控制同时访问资源的线程数量。
    • 线程通过 acquire() 方法获取许可,访问完资源后通过 release() 方法释放许可。

7.4 运行结果

运行上述代码,你会看到各个线程按顺序完成初始化、等待在屏障点、并发访问资源的过程。控制台输出将显示线程的执行步骤,便于理解线程间的协作和同步机制。

八、总结

这篇文章详细介绍了 Java 并发编程中的五大核心工具类:AQS、CountDownLatch、CyclicBarrier、Semaphore 和 Exchanger。我们不仅讨论了它们的底层原理和应用场景,还通过代码示例展示了如何在实际项目中使用这些工具。

8.1 关键点回顾

  • AQS 是构建锁和同步器的基础框架,通过 FIFO 队列和 CAS 操作实现高效的线程同步。
  • CountDownLatch 适用于一次性任务的等待和通知机制。
  • CyclicBarrier 适用于需要重复使用的同步场景,允许一组线程在某个屏障点等待。
  • Semaphore 控制资源的并发访问数量,适用于限流和资源池管理。
  • Exchanger 简化了两个线程之间的数据交换过程。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 开篇
  • AQS — 顶级武将,无懈可击!(AbstractQueuedSynchronizer)
    • 简介与原理
      • 实际应用场景
        • 代码示例
        • CountDownLatch — 开门神兵,开启世界!
          • 简介与原理
            • 实际应用场景
              • 代码示例
              • CyclicBarrier — 天罗地网,协同作战!
                • 简介与原理
                  • 实际应用场景
                    • 代码示例
                    • Semaphore — 交通信号灯,有序通行!
                      • 简介与原理
                        • 实际应用场景
                          • 代码示例
                          • Exchanger — 交流使者,双向通道!
                            • 简介与原理
                              • 实际应用场景
                                • 代码示例
                                • 一、AQS(AbstractQueuedSynchronizer)
                                  • 1.1 概述
                                    • 1.2 底层原理
                                      • 1.3 应用场景
                                        • 1.4 实践代码
                                        • 二、CountDownLatch
                                          • 2.1 概述
                                            • 2.2 底层原理
                                              • 2.3 应用场景
                                                • 2.4 实践代码
                                                • 三、CyclicBarrier
                                                  • 3.1 概述
                                                    • 3.2 底层原理
                                                      • 3.3 应用场景
                                                        • 3.4 实践代码
                                                        • 四、Semaphore
                                                          • 4.1 概述
                                                            • 4.2 底层原理
                                                              • 4.3 应用场景
                                                                • 4.4 实践代码
                                                                • 五、Exchanger
                                                                  • 5.1 概述
                                                                    • 5.2 底层原理
                                                                      • 5.3 应用场景
                                                                        • 5.4 实践代码
                                                                        • 六、深入理解和对比
                                                                          • 6.1 AQS(AbstractQueuedSynchronizer)
                                                                            • 优点
                                                                            • 缺点
                                                                          • 6.2 CountDownLatch
                                                                            • 优点
                                                                            • 缺点
                                                                          • 6.3 CyclicBarrier
                                                                            • 优点
                                                                            • 缺点
                                                                          • 6.4 Semaphore
                                                                            • 优点
                                                                            • 缺点
                                                                          • 6.5 Exchanger
                                                                            • 优点
                                                                            • 缺点
                                                                          • 6.6 实践对比与总结
                                                                          • 七、项目实战:模拟高并发场景
                                                                            • 7.1 场景描述
                                                                              • 7.2 实践代码(续)
                                                                                • 7.3 代码分析
                                                                                  • 7.4 运行结果
                                                                                  • 八、总结
                                                                                    • 8.1 关键点回顾
                                                                                    相关产品与服务
                                                                                    GPU 云服务器
                                                                                    GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于生成式AI,自动驾驶,深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
                                                                                    领券
                                                                                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档