不知道小伙伴们有没有做过流量控制,流量控制这个怎么整?最近刚好看见一个类特别适合做流量控制,几行代码就能搞定,一起来看下吧。
一、Semaphore源码
Semaphore 使用了一个计数器来管理“许可证”的数量,每个线程需要获取许可证才能访问受限资源。当许可证数量为 0 时,其他线程会被阻塞,直到有线程释放许可证。
1、构造方法
public Semaphore(int paramInt) {
this.sync = new NonfairSync(paramInt);
}
public Semaphore(int paramInt, boolean paramBoolean) {
this.sync = paramBoolean ? new FairSync(paramInt) : new NonfairSync(paramInt);
}
permits 表示初始化的许可证数量。
fair 表示是否公平分配许可证。若为 true,将以 FIFO 顺序授予许可证,若为 false,则是非公平的。
2、主要字段
private final Sync sync;
sync 是一个内部类 Sync 的实例,Sync 继承自 AbstractQueuedSynchronizer(AQS)。在 Semaphore 中,所有的核心逻辑几乎都委托给 Sync 这个内部类来处理,分为公平(FairSync)和非公平(NonfairSync)两种实现。
3、核心方法
3.1. acquire() 方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquire() 方法会尝试获取一个许可证。如果许可证不足,它会阻塞线程直到许可证可用。这个方法的内部实现依赖 AQS 提供的 acquireSharedInterruptibly 方法,表示以共享模式获取资源。
acquireSharedInterruptibly 的流程:
首先,尝试获取许可证数量,如果成功则直接返回。
如果许可证数量不足,则将当前线程加入同步队列,并且挂起等待许可证释放。
3.2. release() 方法
public void release() {
sync.releaseShared(1);
}
release() 方法用于释放一个许可证,从而允许其他阻塞的线程获取该许可证。它内部调用了 AQS 的 releaseShared 方法,在许可证释放后,会唤醒等待队列中的第一个线程。
releaseShared 的流程:
增加可用的许可证数量。
唤醒在同步队列中等待的线程,使其可以再次尝试获取许可证。
3.3. tryAcquire(int permits) 方法
public boolean tryAcquire(int permits) {
return sync.tryAcquireShared(permits) >= 0;
}
tryAcquire 尝试获取指定数量的许可证,且不会阻塞线程。若许可证不足,它会立即返回 false。在并发限制的场景中,tryAcquire 常用于在没有资源的情况下避免阻塞。
3.4. availablePermits() 方法
public int availablePermits() {
return sync.getPermits();
}
availablePermits 返回当前可用的许可证数量,是一个非阻塞的查询操作。该方法通过 sync 对象直接返回计数值,反映当前的资源可用性。
4、Sync 内部类
Semaphore 通过 Sync 类实现了具体的同步逻辑。Sync 继承了 AbstractQueuedSynchronizer(AQS),并根据公平和非公平性分别定义了 FairSync 和 NonfairSync。
4.1. 公平模式
公平模式下的 FairSync 类遵循 FIFO 原则,保证线程获取许可证的顺序。
static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors()) {
return -1; // 其他线程在排队
}
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
在 tryAcquireShared 方法中,如果当前线程前面有其他排队线程,它会返回 -1 并等待。否则,它会尝试减小许可证数量。
4.2. 非公平模式
非公平模式的 NonfairSync 类会立即尝试获取许可证,而不关心是否有其他线程在排队。这个模式可以提高性能,但可能会导致“饥饿”现象。
static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
在 tryAcquireShared 方法中,非公平模式不检查排队情况,直接尝试获取许可证。如果获取失败,它会反复尝试直到成功。
Semaphore 通过许可证的概念来控制对共享资源的访问数量。核心逻辑通过 Sync 内部类实现,并依赖 AbstractQueuedSynchronizer(AQS)管理同步队列和许可证计数。
公平和非公平模式的选择取决于性能需求和公平性要求。在公平模式下,FIFO 保证了线程访问顺序,但性能稍有影响;非公平模式则倾向于提高吞吐量,但可能会造成某些线程“饥饿”。
二、流量控制的应用
场景:限制对特定资源的并发访问数量,比如流量控制。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private static final int THREAD_COUNT = 10;
// 当只允许单线程访问时,可以将 Semaphore 配置为 1,以实现互斥访问。
private static final Semaphore semaphore = new Semaphore(2); // 最大并发数为2
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
executor.execute(() -> {
try {
semaphore.acquire(); // 获取许可证
System.out.println(Thread.currentThread().getName() + ": 访问资源");
Thread.sleep(1000); // 模拟业务处理
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + ": 资源释放");
semaphore.release(); // 释放许可证
}
});
}
executor.shutdown();
}
}
这个示例创建了一个线程池,并发数为 10,每个线程尝试获取 Semaphore 的许可证,许可证上限为 2。
只有 2个线程可以同时访问资源,超过的线程会被阻塞,直到有线程释放许可证。
在资源访问类场景中,Semaphore 可以有效限制并发数,确保资源不会被过度使用。
运行结果:
pool-1-thread-1: 访问资源
pool-1-thread-2: 访问资源
pool-1-thread-1: 资源释放
pool-1-thread-2: 资源释放
pool-1-thread-3: 访问资源
pool-1-thread-4: 访问资源
pool-1-thread-3: 资源释放
pool-1-thread-4: 资源释放
pool-1-thread-5: 访问资源
pool-1-thread-6: 访问资源
pool-1-thread-6: 资源释放
pool-1-thread-5: 资源释放
pool-1-thread-7: 访问资源
pool-1-thread-8: 访问资源
pool-1-thread-8: 资源释放
pool-1-thread-7: 资源释放
pool-1-thread-9: 访问资源
pool-1-thread-10: 访问资源
pool-1-thread-10: 资源释放
pool-1-thread-9: 资源释放
三、Semaphore的优点
灵活的并发控制
Semaphore 允许我们指定可用的“许可证”数量,从而控制同时访问共享资源的线程数。这对于限制访问次数或并发数量非常有效,能够有效地防止系统资源过载。
实现简单,容易理解
Semaphore 的使用相对简单,通常只需设置最大并发数(即许可证数),并在需要访问共享资源的线程中通过 acquire() 和 release() 方法控制许可证的获取和释放。它的设计简单直观,容易上手。
避免死锁
与某些同步工具(如 synchronized 或 ReentrantLock)相比,Semaphore 并不需要考虑传统的死锁问题。在适当使用的情况下,Semaphore 不会引入死锁,因为它允许线程在获取许可证失败时不会阻塞太久,并能随时释放已获得的许可证。
适用于流量控制和资源池管理
Semaphore 非常适合实现流量控制(如限流器)或资源池管理(如数据库连接池)。它通过限制同时执行的操作数(如数据库连接数、API 请求数)来防止系统过载。
非阻塞(可选)
使用 Semaphore 的 tryAcquire() 方法,线程可以选择不阻塞地尝试获取许可证。如果获取失败,线程可以立即返回,而不需要进入阻塞状态。这对于某些场景(如需要高响应性的请求)非常有用。
四、Semaphore的缺点
容易导致资源耗尽
如果没有适当地释放许可证,Semaphore 可能导致资源耗尽或死锁。由于它是基于许可证计数的,如果某些线程在使用资源后没有调用 release() 方法,许可证会保持“占用”状态,导致其他线程无法继续执行。
不适合复杂的同步操作
Semaphore 主要用于控制访问的线程数量,不能很好地处理更复杂的同步需求(如条件同步)。例如,在一些更复杂的并发场景中,可能需要更精细的控制(如多条件等待),这时 Semaphore 可能不够灵活,需要考虑使用其他同步机制(如 CountDownLatch、CyclicBarrier、ReentrantLock 等)。
较少的语义
Semaphore 本身并没有内建的语义来表示线程在竞争资源时的状态,它仅仅是简单的许可证计数器。对某些复杂的场景来说,Semaphore 可能不能直接表达应用程序的业务逻辑和同步需求,使用起来可能不如其他工具直观。
不保证公平性
Semaphore 默认情况下并不保证公平性,即不能确保按请求顺序分配许可证。在高并发情况下,这可能会导致某些线程长时间无法获得许可证,或者某些线程获得许可证的机会较多。为了实现公平性,可以使用 fair 参数来构造一个公平的 Semaphore,但这可能会带来一定的性能开销。
不具备条件变量的功能
Semaphore 并不像 Object.wait() 和 Object.notify() 那样能够支持复杂的条件变量功能(即线程需要等待某些特定条件的出现才继续执行)。如果需要根据某些条件做同步操作,Semaphore 可能不足够强大,通常需要与其他同步工具(如 Lock 或 Condition)结合使用。
五、最后总结
Semaphore 是一种非常有效的并发控制工具,适用于限制并发访问的场景,比如连接池、流量控制等。但它也有一些局限性,特别是在复杂的同步需求或需要精细条件同步的场景中,可能需要其他更适合的工具来配合使用。其易用性和灵活性使得它在一些场景下非常有用,但同时也需要谨慎管理许可证的获取与释放,以避免出现资源耗尽或死锁等问题。
领取专属 10元无门槛券
私享最新 技术干货