首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Java Semaphore源码解读和流量控制的应用

不知道小伙伴们有没有做过流量控制,流量控制这个怎么整?最近刚好看见一个类特别适合做流量控制,几行代码就能搞定,一起来看下吧。

一、Semaphore源码

Semaphore 使用了一个计数器来管理“许可证”的数量,每个线程需要获取许可证才能访问受限资源。当许可证数量为 0 时,其他线程会被阻塞,直到有线程释放许可证。

1、构造方法

public Semaphore(int paramInt) {

this.sync = new NonfairSync(paramInt);

}

public Semaphore(int paramInt, boolean paramBoolean) {

this.sync = paramBoolean ? new FairSync(paramInt) : new NonfairSync(paramInt);

}

permits 表示初始化的许可证数量。

fair 表示是否公平分配许可证。若为 true,将以 FIFO 顺序授予许可证,若为 false,则是非公平的。

2、主要字段

private final Sync sync;

sync 是一个内部类 Sync 的实例,Sync 继承自 AbstractQueuedSynchronizer(AQS)。在 Semaphore 中,所有的核心逻辑几乎都委托给 Sync 这个内部类来处理,分为公平(FairSync)和非公平(NonfairSync)两种实现。

3、核心方法

3.1. acquire() 方法

public void acquire() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

acquire() 方法会尝试获取一个许可证。如果许可证不足,它会阻塞线程直到许可证可用。这个方法的内部实现依赖 AQS 提供的 acquireSharedInterruptibly 方法,表示以共享模式获取资源。

acquireSharedInterruptibly 的流程:

首先,尝试获取许可证数量,如果成功则直接返回。

如果许可证数量不足,则将当前线程加入同步队列,并且挂起等待许可证释放。

3.2. release() 方法

public void release() {

sync.releaseShared(1);

}

release() 方法用于释放一个许可证,从而允许其他阻塞的线程获取该许可证。它内部调用了 AQS 的 releaseShared 方法,在许可证释放后,会唤醒等待队列中的第一个线程。

releaseShared 的流程:

增加可用的许可证数量。

唤醒在同步队列中等待的线程,使其可以再次尝试获取许可证。

3.3. tryAcquire(int permits) 方法

public boolean tryAcquire(int permits) {

return sync.tryAcquireShared(permits) >= 0;

}

tryAcquire 尝试获取指定数量的许可证,且不会阻塞线程。若许可证不足,它会立即返回 false。在并发限制的场景中,tryAcquire 常用于在没有资源的情况下避免阻塞。

3.4. availablePermits() 方法

public int availablePermits() {

return sync.getPermits();

}

availablePermits 返回当前可用的许可证数量,是一个非阻塞的查询操作。该方法通过 sync 对象直接返回计数值,反映当前的资源可用性。

4、Sync 内部类

Semaphore 通过 Sync 类实现了具体的同步逻辑。Sync 继承了 AbstractQueuedSynchronizer(AQS),并根据公平和非公平性分别定义了 FairSync 和 NonfairSync。

4.1. 公平模式

公平模式下的 FairSync 类遵循 FIFO 原则,保证线程获取许可证的顺序。

static final class FairSync extends Sync {

protected int tryAcquireShared(int acquires) {

for (;;) {

if (hasQueuedPredecessors()) {

return -1; // 其他线程在排队

}

int available = getState();

int remaining = available - acquires;

if (remaining < 0 || compareAndSetState(available, remaining)) {

return remaining;

}

}

}

}

在 tryAcquireShared 方法中,如果当前线程前面有其他排队线程,它会返回 -1 并等待。否则,它会尝试减小许可证数量。

4.2. 非公平模式

非公平模式的 NonfairSync 类会立即尝试获取许可证,而不关心是否有其他线程在排队。这个模式可以提高性能,但可能会导致“饥饿”现象。

static final class NonfairSync extends Sync {

protected int tryAcquireShared(int acquires) {

for (;;) {

int available = getState();

int remaining = available - acquires;

if (remaining < 0 || compareAndSetState(available, remaining)) {

return remaining;

}

}

}

}

在 tryAcquireShared 方法中,非公平模式不检查排队情况,直接尝试获取许可证。如果获取失败,它会反复尝试直到成功。

Semaphore 通过许可证的概念来控制对共享资源的访问数量。核心逻辑通过 Sync 内部类实现,并依赖 AbstractQueuedSynchronizer(AQS)管理同步队列和许可证计数。

公平和非公平模式的选择取决于性能需求和公平性要求。在公平模式下,FIFO 保证了线程访问顺序,但性能稍有影响;非公平模式则倾向于提高吞吐量,但可能会造成某些线程“饥饿”。

二、流量控制的应用

场景:限制对特定资源的并发访问数量,比如流量控制。

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

public class SemaphoreExample {

private static final int THREAD_COUNT = 10;

// 当只允许单线程访问时,可以将 Semaphore 配置为 1,以实现互斥访问。

private static final Semaphore semaphore = new Semaphore(2); // 最大并发数为2

public static void main(String[] args) {

ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

for (int i = 0; i < THREAD_COUNT; i++) {

executor.execute(() -> {

try {

semaphore.acquire(); // 获取许可证

System.out.println(Thread.currentThread().getName() + ": 访问资源");

Thread.sleep(1000); // 模拟业务处理

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

System.out.println(Thread.currentThread().getName() + ": 资源释放");

semaphore.release(); // 释放许可证

}

});

}

executor.shutdown();

}

}

这个示例创建了一个线程池,并发数为 10,每个线程尝试获取 Semaphore 的许可证,许可证上限为 2。

只有 2个线程可以同时访问资源,超过的线程会被阻塞,直到有线程释放许可证。

在资源访问类场景中,Semaphore 可以有效限制并发数,确保资源不会被过度使用。

运行结果:

pool-1-thread-1: 访问资源

pool-1-thread-2: 访问资源

pool-1-thread-1: 资源释放

pool-1-thread-2: 资源释放

pool-1-thread-3: 访问资源

pool-1-thread-4: 访问资源

pool-1-thread-3: 资源释放

pool-1-thread-4: 资源释放

pool-1-thread-5: 访问资源

pool-1-thread-6: 访问资源

pool-1-thread-6: 资源释放

pool-1-thread-5: 资源释放

pool-1-thread-7: 访问资源

pool-1-thread-8: 访问资源

pool-1-thread-8: 资源释放

pool-1-thread-7: 资源释放

pool-1-thread-9: 访问资源

pool-1-thread-10: 访问资源

pool-1-thread-10: 资源释放

pool-1-thread-9: 资源释放

三、Semaphore的优点

灵活的并发控制

Semaphore 允许我们指定可用的“许可证”数量,从而控制同时访问共享资源的线程数。这对于限制访问次数或并发数量非常有效,能够有效地防止系统资源过载。

实现简单,容易理解

Semaphore 的使用相对简单,通常只需设置最大并发数(即许可证数),并在需要访问共享资源的线程中通过 acquire() 和 release() 方法控制许可证的获取和释放。它的设计简单直观,容易上手。

避免死锁

与某些同步工具(如 synchronized 或 ReentrantLock)相比,Semaphore 并不需要考虑传统的死锁问题。在适当使用的情况下,Semaphore 不会引入死锁,因为它允许线程在获取许可证失败时不会阻塞太久,并能随时释放已获得的许可证。

适用于流量控制和资源池管理

Semaphore 非常适合实现流量控制(如限流器)或资源池管理(如数据库连接池)。它通过限制同时执行的操作数(如数据库连接数、API 请求数)来防止系统过载。

非阻塞(可选)

使用 Semaphore 的 tryAcquire() 方法,线程可以选择不阻塞地尝试获取许可证。如果获取失败,线程可以立即返回,而不需要进入阻塞状态。这对于某些场景(如需要高响应性的请求)非常有用。

四、Semaphore的缺点

容易导致资源耗尽

如果没有适当地释放许可证,Semaphore 可能导致资源耗尽或死锁。由于它是基于许可证计数的,如果某些线程在使用资源后没有调用 release() 方法,许可证会保持“占用”状态,导致其他线程无法继续执行。

不适合复杂的同步操作

Semaphore 主要用于控制访问的线程数量,不能很好地处理更复杂的同步需求(如条件同步)。例如,在一些更复杂的并发场景中,可能需要更精细的控制(如多条件等待),这时 Semaphore 可能不够灵活,需要考虑使用其他同步机制(如 CountDownLatch、CyclicBarrier、ReentrantLock 等)。

较少的语义

Semaphore 本身并没有内建的语义来表示线程在竞争资源时的状态,它仅仅是简单的许可证计数器。对某些复杂的场景来说,Semaphore 可能不能直接表达应用程序的业务逻辑和同步需求,使用起来可能不如其他工具直观。

不保证公平性

Semaphore 默认情况下并不保证公平性,即不能确保按请求顺序分配许可证。在高并发情况下,这可能会导致某些线程长时间无法获得许可证,或者某些线程获得许可证的机会较多。为了实现公平性,可以使用 fair 参数来构造一个公平的 Semaphore,但这可能会带来一定的性能开销。

不具备条件变量的功能

Semaphore 并不像 Object.wait() 和 Object.notify() 那样能够支持复杂的条件变量功能(即线程需要等待某些特定条件的出现才继续执行)。如果需要根据某些条件做同步操作,Semaphore 可能不足够强大,通常需要与其他同步工具(如 Lock 或 Condition)结合使用。

五、最后总结

Semaphore 是一种非常有效的并发控制工具,适用于限制并发访问的场景,比如连接池、流量控制等。但它也有一些局限性,特别是在复杂的同步需求或需要精细条件同步的场景中,可能需要其他更适合的工具来配合使用。其易用性和灵活性使得它在一些场景下非常有用,但同时也需要谨慎管理许可证的获取与释放,以避免出现资源耗尽或死锁等问题。

  • 发表于:
  • 原文链接https://page.om.qq.com/page/O55S-nWHyBAST05SDl9PMQuw0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券