今天为大家带来的是并发设计模式实战系列,第十七章信号量(Semaphore),废话不多说直接开始~
┌───────────────┐ ┌───────────────┐
│ Resource │───┬──>│ Semaphore │
│ Pool │ │ │ (计数器+队列) │
└───────────────┘ │ └───────────────┘
│
┌───────────────┐ │
│ Thread │<──┘
│ Request │
└───────────────┘
acquire()
:许可-1(当>0时立即返回,=0时线程阻塞)release()
:许可+1(唤醒等待队列中的线程)new Semaphore(N)
初始化许可数tryAcquire(timeout)
防止死锁系统组件 | 现实类比 | 核心行为 |
---|---|---|
Semaphore | 剩余车位显示屏 | 显示可用车位数量 |
acquire() | 车辆进入抬杆 | 占用车位(数量-1) |
release() | 车辆离开 | 释放车位(数量+1) |
等待队列 | 入口排队车辆 | 按到达顺序或抢车位 |
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class SemaphoreDemo {
// 数据库连接池实现
static class ConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<Connection> pool;
public ConnectionPool(int poolSize) {
this.semaphore = new Semaphore(poolSize, true); // 公平模式
this.pool = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
pool.add(new Connection("Conn-" + i));
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // 如果没有许可则阻塞
return pool.take();
}
public void releaseConnection(Connection conn) {
pool.offer(conn);
semaphore.release(); // 释放许可
}
}
static class Connection {
private String name;
public Connection(String name) { this.name = name; }
@Override
public String toString() { return name; }
}
// 模拟业务操作
public static void main(String[] args) {
final ConnectionPool pool = new ConnectionPool(3);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
Connection conn = pool.getConnection();
System.out.println(Thread.currentThread().getName()
+ " 获取连接: " + conn);
// 模拟业务操作
Thread.sleep(1000);
pool.releaseConnection(conn);
System.out.println(Thread.currentThread().getName()
+ " 释放连接: " + conn);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
// 创建信号量(公平模式 vs 非公平模式)
new Semaphore(permits, true);
// 超时获取许可(避免死锁)
semaphore.tryAcquire(2, TimeUnit.SECONDS);
// 一次性获取多个许可
semaphore.acquire(3); // 需要3个许可才能继续
工具 | 特点 | 适用场景 |
---|---|---|
Semaphore | 控制资源访问数量 | 连接池、限流 |
CountDownLatch | 一次性栅栏 | 多线程任务汇总 |
CyclicBarrier | 可重复使用的栅栏 | 多阶段并行计算 |
ReentrantLock | 独占锁 | 临界区精细控制 |
策略 | 优点 | 缺点 |
---|---|---|
公平模式 | 避免线程饥饿 | 吞吐量较低 |
非公平模式 | 吞吐量高 | 可能造成线程饥饿 |
多许可申请 | 支持复杂资源分配 | 容易导致死锁 |
可中断获取 | 响应线程中断 | 需要处理中断异常 |
// 动态扩容(JDK没有直接方法,需通过包装实现)
class ResizableSemaphore {
private final ReentrantLock lock = new ReentrantLock();
private Semaphore semaphore;
public ResizableSemaphore(int permits) {
this.semaphore = new Semaphore(permits);
}
public void setPermits(int newPermits) {
lock.lock();
try {
int delta = newPermits - semaphore.availablePermits();
if (delta > 0) {
semaphore.release(delta); // 增加许可
} else {
semaphore.reducePermits(-delta); // 减少许可
}
} finally {
lock.unlock();
}
}
}
// 监控关键指标
int availablePermits = semaphore.availablePermits();
int queueLength = semaphore.getQueueLength(); // 等待线程数
// 使用信号量限制任务提交速率
ExecutorService executor = Executors.newCachedThreadPool();
Semaphore rateLimiter = new Semaphore(10); // 最大10并发
executor.execute(() -> {
rateLimiter.acquire();
try {
// 执行任务...
} finally {
rateLimiter.release();
}
});
好的!我将延续原有结构,从 第六部分 开始扩展信号量(Semaphore)的高级特性和工程实践细节。
┌───────────────────┐
│ Semaphore │
│ (Sync继承AQS) │
│ - state=permits │
│ - 共享模式 │
└─────────┬─────────┘
│
┌─────────▼─────────┐
│ NonFairSync │ 或 │ FairSync │
│ - 直接竞争许可 │ │ - FIFO队列 │
└───────────────────┘
NonFairSync.tryAcquireShared()
允许插队FairSync.tryAcquireShared()
检查是否有等待队列// JDK 17中的非公平获取逻辑
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining; // 负数表示获取失败
}
}
}
问题类型 | 现象 | 解决方案 |
---|---|---|
许可泄漏 | 可用许可逐渐减少 | 必须用try-finally块保证释放 |
线程饥饿 | 低优先级线程长期未执行 | 使用公平模式 |
死锁 | 多许可申请顺序不当 | 统一申请/释放顺序 |
响应中断 | 阻塞线程无法响应中断 | 使用acquireInterruptibly() |
// 1. 打印信号量状态
System.out.println("可用许可: " + semaphore.availablePermits());
System.out.println("等待线程: " + semaphore.getQueueLength());
// 2. 使用JMX监控
ManagementFactory.getPlatformMBeanServer()
.registerMBean(semaphore, new ObjectName("java.util.concurrent:type=Semaphore"));
ExecutorService executor = Executors.newCachedThreadPool();
Semaphore limiter = new Semaphore(20); // 最大20并发
void submitTask(Runnable task) {
limiter.acquire();
executor.execute(() -> {
try {
task.run();
} finally {
limiter.release();
}
});
}
Semaphore semaphore = new Semaphore(5);
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
semaphore.acquire();
try {
// 阶段1:受限资源操作
doPhase1Work();
latch.countDown();
// 阶段2:等待其他线程完成
latch.await();
doPhase2Work();
} finally {
semaphore.release();
}
}).start();
}
最大许可数 = (目标TPS × 平均耗时(秒)) / (1 - 冗余系数)
示例:
- 目标TPS=1000,平均耗时=0.1s,冗余系数=0.3
- 许可数 = (1000×0.1)/(1-0.3) ≈ 143
场景 | 许可数设置建议 | 公平性选择 |
---|---|---|
数据库连接池 | 物理连接数的1.2倍 | 非公平 |
API限流 | 根据SLAB配额设置 | 公平 |
文件IO控制 | CPU核心数×2 | 非公平 |
class DynamicSemaphore {
private final ReentrantLock lock = new ReentrantLock();
private Semaphore semaphore;
public DynamicSemaphore(int permits) {
this.semaphore = new Semaphore(permits);
}
public void addPermits(int delta) {
lock.lock();
try {
if (delta > 0) {
semaphore.release(delta);
} else {
int reduction = -delta;
semaphore.acquire(reduction); // 减少可用许可
}
} finally {
lock.unlock();
}
}
}
class AutoReleaseSemaphore {
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler;
public AutoReleaseSemaphore(int permits) {
this.semaphore = new Semaphore(permits);
this.scheduler = Executors.newSingleThreadScheduledExecutor();
}
public void acquireWithTimeout(long timeout, TimeUnit unit)
throws InterruptedException {
semaphore.acquire();
scheduler.schedule(() -> {
semaphore.release();
System.out.println("自动释放许可");
}, timeout, unit);
}
}
Kafka Producer使用Semaphore实现:
- 未确认请求数限制(max.in.flight.requests.per.connection)
- 内存缓冲区阻塞控制(buffer.memory)
<!-- 在server.xml中配置信号量式连接限制 -->
<Connector
executor="threadPool"
maxConnections="10000" <!-- 信号量控制 -->
acceptCount="100" <!-- 等待队列 -->
/>