今天为大家带来的是并发设计模式实战系列,第8章Active Object,废话不多说直接开始~
┌───────────────┐ ┌─────────────────┐ ┌───────────────┐
│ Client │ │ Method Request │ │ Scheduler │
│ (同步调用接口) │───>│ (方法封装对象) │───>│ (任务调度器) │
└───────────────┘ └─────────────────┘ └───────────────┘
▲ │
│ ▼
│ ┌───────────────┐
└─────────────────────────────────│ Servant │
│ (实际执行体) │
└───────────────┘
系统组件 | 现实类比 | 核心行为 |
---|---|---|
Client | 顾客 | 下单但不参与烹饪过程 |
Proxy | 服务员 | 接收订单并转交后厨 |
Scheduler | 厨师长 | 安排厨师处理订单队列 |
Servant | 厨师团队 | 实际烹饪操作 |
Future | 取餐号码牌 | 凭此后续获取菜品 |
import java.util.concurrent.*;
// 1. 定义业务接口
interface MyService {
Future<String> process(String data) throws InterruptedException;
}
// 2. 实现Servant(实际执行体)
class MyServant implements MyService {
@Override
public String doProcess(String data) throws InterruptedException {
Thread.sleep(1000); // 模拟耗时操作
return "Processed: " + data.toUpperCase();
}
}
// 3. 方法请求封装(Command模式)
class MethodRequest implements Callable<String> {
private final MyServant servant;
private final String data;
public MethodRequest(MyServant servant, String data) {
this.servant = servant;
this.data = data;
}
@Override
public String call() throws Exception {
return servant.doProcess(data);
}
}
// 4. Active Object代理
class MyServiceProxy implements MyService {
private final ExecutorService scheduler =
Executors.newSingleThreadExecutor(); // 可替换为线程池
private final MyServant servant = new MyServant();
@Override
public Future<String> process(String data) {
System.out.println("[Proxy] 接收请求: " + data);
Future<String> future = scheduler.submit(new MethodRequest(servant, data));
System.out.println("[Proxy] 已提交任务队列");
return future;
}
public void shutdown() {
scheduler.shutdown();
}
}
// 5. 客户端使用
public class ActiveObjectDemo {
public static void main(String[] args) throws Exception {
MyService service = new MyServiceProxy();
// 异步调用
Future<String> future1 = service.process("hello");
Future<String> future2 = service.process("world");
System.out.println("[Client] 提交任务后立即继续其他操作...");
// 获取结果(阻塞直到完成)
System.out.println("[Client] 结果1: " + future1.get());
System.out.println("[Client] 结果2: " + future2.get());
((MyServiceProxy)service).shutdown();
}
}
// 调度器优化:使用带容量的线程池
ThreadPoolExecutor scheduler = new ThreadPoolExecutor(
1, // 核心线程
4, // 最大线程
30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), // 防止无限制堆积
new ThreadPoolExecutor.CallerRunsPolicy()
);
// Future增强:使用CompletableFuture
public Future<String> process(String data) {
CompletableFuture<String> future = new CompletableFuture<>();
scheduler.execute(() -> {
try {
String result = servant.doProcess(data);
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}
模式 | 线程管理 | 调用方式 | 适用场景 |
---|---|---|---|
Active Object | 集中调度 | 异步调用 | 需要方法调用顺序控制 |
Half-Sync/Half-Async | 分层管理 | 混合调用 | 高并发I/O+阻塞任务混合 |
Thread-Per-Request | 每次新建线程 | 同步调用 | 简单短任务 |
Reactor | 事件驱动 | 非阻塞 | 纯高并发网络I/O处理 |
特性 | Active Object | 普通线程池 |
---|---|---|
调用控制 | 方法级封装 | Runnable/Callable |
顺序保证 | 严格队列顺序 | 可配置优先级 |
异常处理 | 通过Future获取 | 自行捕获处理 |
资源管理 | 集中调度可控 | 依赖线程池配置 |
class PriorityMethodRequest implements Comparable<PriorityMethodRequest>, Callable<String> {
private int priority; // 优先级字段
@Override
public int compareTo(PriorityMethodRequest o) {
return Integer.compare(o.priority, this.priority);
}
}
// 使用PriorityBlockingQueue
ThreadPoolExecutor scheduler = new ThreadPoolExecutor(
1, 4, 30, TimeUnit.SECONDS,
new PriorityBlockingQueue<>(100)
);
Future<String> future = service.process("data");
try {
String result = future.get(2, TimeUnit.SECONDS); // 设置超时
} catch (TimeoutException e) {
future.cancel(true); // 取消任务
}
// 监控队列积压
int queueSize = ((ThreadPoolExecutor)scheduler).getQueue().size();
// 跟踪方法执行时间
long start = System.nanoTime();
String result = servant.doProcess(data);
long duration = System.nanoTime() - start;
// 扩展为多消费者线程池
class MultiThreadActiveObject implements MyService {
private final ExecutorService scheduler =
Executors.newFixedThreadPool(4); // 多线程调度
// ...其余实现与单线程版本相同...
}
// 适用场景:CPU密集型任务处理
┌───────────────┐ ┌─────────────────┐ ┌───────────────┐
│ Event │ │ Active Object │ │ Reactor │
│ Producer │───>│ (带队列缓冲) │───>│ (非阻塞I/O) │
└───────────────┘ └─────────────────┘ └───────────────┘
class EventDrivenActiveObject {
private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();
private final Reactor reactor = new Reactor();
public void onEvent(Event event) {
eventQueue.offer(event);
}
private void processEvents() {
while (true) {
Event event = eventQueue.take();
reactor.handleEvent(event); // 转交Reactor处理
}
}
}
class RobustMethodRequest implements Callable<String> {
@Override
public String call() {
try {
return servant.doProcess(data);
} catch (Exception e) {
// 1. 记录详细上下文信息
// 2. 触发补偿机制
// 3. 返回兜底结果
return "FallbackResult";
}
}
}
// 使用装饰器模式统一处理
public Future<String> process(String data) {
FutureTask<String> task = new FutureTask<>(
new ExceptionHandlingDecorator(
new MethodRequest(servant, data)
)
);
scheduler.execute(task);
return task;
}
// 根据系统负载动态调整
class AdaptiveScheduler {
private final ThreadPoolExecutor executor;
public void adjustPoolSize() {
if (systemOverloaded()) {
executor.setCorePoolSize(2); // 降级处理
} else {
executor.setCorePoolSize(8); // 正常处理
}
}
private boolean systemOverloaded() {
return executor.getQueue().size() > 50
|| SystemLoadAverage() > 2.0;
}
}
参数 | 低负载场景 | 高并发场景 | 计算密集型场景 |
---|---|---|---|
核心线程数 | CPU核数 | CPU核数×2 | CPU核数+1 |
队列容量 | 100-500 | 1000-5000 | 100-200 |
拒绝策略 | CallerRuns | DiscardOldest | AbortPolicy |
优先级策略 | 关闭 | 业务分级启用 | 计算优先级启用 |
// 通过JMX暴露关键指标
class ActiveObjectMetrics implements ActiveObjectMetricsMBean {
public int getQueueSize() {
return executor.getQueue().size();
}
public double getAvgProcessTime() {
return timer.getMeanRate();
}
}
// 注册MBean
ManagementFactory.getPlatformMBeanServer()
.registerMBean(new ActiveObjectMetrics(), name);
┌───────────────┐ ┌───────────────┐
│ Client Thread │ │ Active Object │
│ (持有锁A) │ │ (等待锁B) │
│ request1() │───────>│ 正在执行 │
└───────────────┘ └───────────────┘
↑ ↑
│ request2()需要锁B │ 需要锁A继续执行
└─────────────────────────┘
解决方案:
lock.tryLock(100, TimeUnit.MILLISECONDS)
// 弱引用持有Future
private final Map<Future<?>, WeakReference<Context>> contextMap =
new ConcurrentHashMap<>();
// 定期清理已完成任务
scheduler.scheduleAtFixedRate(() -> {
contextMap.entrySet().removeIf(e ->
e.getKey().isDone() || e.getValue().get() == null
);
}, 1, 1, TimeUnit.HOURS);
┌───────────────────────┐
│ 订单接收 (Active Object) │
├───────────────────────┤
│ 1. 验证请求合法性 │
│ 2. 生成交易流水号 │
│ 3. 进入风险控制队列 │
└───────────────┬───────┘
↓
┌───────────────────────┐
│ 风控处理 (优先级队列) │
├───────────────────────┤
│ • VIP客户优先处理 │
│ • 黑名单实时拦截 │
└───────────────────────┘
class DeviceManagerProxy implements DeviceAPI {
// 设备命令按优先级处理
private final PriorityBlockingQueue<Command> queue;
public Future<Result> sendCommand(Device device, Command cmd) {
HighPriorityCommand wrappedCmd =
new HighPriorityCommand(device, cmd);
return scheduler.submit(wrappedCmd);
}
private class HighPriorityCommand implements Comparable<HighPriorityCommand> {
// 根据设备类型设置优先级
public int compareTo(HighPriorityCommand o) {
return this.device.isCritical() ? 1 : -1;
}
}
}
通过这十个维度的系统化解析,Active Object模式可覆盖从基础实现到高级优化的全场景需求。关键点总结:
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有