今天为大家带来的是并发设计模式实战系列,第二十章扇出/扇入模式(Fan-Out/Fan-In),废话不多说直接开始~
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │ ──> │ Workers │ ──> │ Collector │
│ (Main Task) │ │ (并行处理) │ │ (结果聚合) │
└─────────────┘ └─────────────┘ └─────────────┘
│ ▲ ▲
└───────────────────┴───────────────────┘
Fan-Out Fan-In
ForkJoinPool
实现负载均衡CompletionService
按完成顺序获取系统组件 | 现实类比 | 核心行为 |
---|---|---|
Producer | 订单接收台 | 将套餐拆解为汉堡、薯条等单品 |
Workers | 多个厨师工作站 | 并行制作不同食物组件 |
Collector | 装配台 | 将完成组件组合成完整套餐 |
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class FanOutFanInDemo {
// 模拟CPU密集型任务
static int processItem(int item) {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(100));
} catch (InterruptedException e) {}
return item * item; // 平方计算
}
public static void main(String[] args) throws Exception {
List<Integer> inputs = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
// 方法1:使用CompletableFuture(自动Fan-In)
System.out.println("=== CompletableFuture方案 ===");
long start = System.currentTimeMillis();
CompletableFuture<Void> future = CompletableFuture.allOf(
inputs.stream()
.map(item -> CompletableFuture.supplyAsync(() -> processItem(item)))
.toArray(CompletableFuture[]::new)
).thenApply(ignored -> {
System.out.println("所有任务完成");
return null;
});
future.get();
System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);
// 方法2:使用ForkJoinPool(工作窃取)
System.out.println("\n=== ForkJoinPool方案 ===");
ForkJoinPool pool = new ForkJoinPool(4);
start = System.currentTimeMillis();
List<Integer> results = pool.submit(() ->
inputs.parallelStream()
.map(FanOutFanInDemo::processItem)
.collect(Collectors.toList())
).get();
System.out.println("结果数量: " + results.size());
System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);
// 方法3:手动控制(精确管理)
System.out.println("\n=== 手动控制方案 ===");
ExecutorService workers = Executors.newFixedThreadPool(4);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(workers);
start = System.currentTimeMillis();
// Fan-Out阶段
inputs.forEach(item -> completionService.submit(() -> processItem(item)));
// Fan-In阶段
List<Integer> orderedResults = new ArrayList<>(inputs.size());
for (int i = 0; i < inputs.size(); i++) {
try {
orderedResults.add(completionService.take().get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("结果数量: " + orderedResults.size());
System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);
workers.shutdown();
}
}
// 1. CompletableFuture默认使用ForkJoinPool.commonPool()
// 可指定自定义线程池:
CompletableFuture.supplyAsync(() -> task, customPool);
// 2. ForkJoinPool参数选择:
// - 并行度通常设为CPU核心数
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
// 3. 手动方案背压控制:
// 使用有界队列+CallerRunsPolicy
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy());
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
CompletableFuture | 声明式编程,自动聚合 | 难以精确控制线程 | 简单并行任务 |
ForkJoinPool | 工作窃取算法效率高 | 适合CPU密集型 | 递归分治任务 |
手动控制 | 完全控制流程和资源 | 代码复杂度高 | 需要严格顺序/背压 |
因素 | 影响程度 | 优化建议 |
---|---|---|
任务粒度 | ★★★★ | 每个任务10ms-100ms为宜 |
线程池大小 | ★★★ | IO密集型:大线程池 CPU密集型:小线程池 |
结果传输开销 | ★★ | 避免在Worker和Collector间传大对象 |
任务倾斜 | ★★ | 使用工作窃取线程池 |
// 根据系统负载调整批次大小
int batchSize = Runtime.getRuntime().availableProcessors() * 2;
List<List<Integer>> batches = Lists.partition(inputs, batchSize);
// 为CompletableFuture添加异常处理
future.exceptionally(ex -> {
System.err.println("任务失败: " + ex.getMessage());
return null;
});
// IO密集型 + CPU密集型混合
ExecutorService ioPool = Executors.newCachedThreadPool();
ExecutorService cpuPool = Executors.newWorkStealingPool();
CompletableFuture.supplyAsync(() -> queryDB(), ioPool)
.thenApplyAsync(data -> processData(data), cpuPool);
// ForkJoinPool监控
ForkJoinPool pool = (ForkJoinPool) ForkJoinPool.commonPool();
System.out.println("活跃线程数: " + pool.getActiveThreadCount());
System.out.println("窃取任务数: " + pool.getStealCount());
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Stage 1 │───> │ Stage 2 │───> │ Stage 3 │
│ (Fan-Out)│<─── │ (Fan-Out)│<─── │ (Fan-In) │
└─────────┘ └─────────┘ └─────────┘
特点:
Java实现:
// 使用Phaser协调多阶段
Phaser phaser = new Phaser(3);
ExecutorService[] stagePools = {Executors.newFixedThreadPool(4), ...};
// Stage 1
stagePools[0].submit(() -> {
List<Data> stage1Results = dataList.parallelStream()
.map(d -> processStage1(d)).collect(Collectors.toList());
phaser.arriveAndAwaitAdvance();
return stage1Results;
});
// Stage 2同理...
场景:根据运行时条件决定分支数量
CompletableFuture<?>[] futures = inputList.stream()
.map(input -> {
if (needFork(input)) {
return CompletableFuture.allOf(
taskA(input),
taskB(input)
);
} else {
return taskC(input);
}
}).toArray(CompletableFuture[]::new);
风险点 | 检测方法 | 解决方案 |
---|---|---|
Fan-Out过度 | 监控线程池队列堆积 | 限制最大分片数量(如Guava的RateLimiter) |
Fan-In阻塞 | 线程dump显示Collector阻塞 | 使用CompletionService.poll(timeout) |
资源竞争 | JFR显示高锁竞争 | 采用无锁队列(ConcurrentLinkedQueue) |
结果顺序错乱 | 业务校验失败 | 使用AtomicInteger序号标记原始顺序 |
// Reactor + Fan-Out
Flux.fromIterable(inputList)
.parallel()
.runOn(Schedulers.parallel())
.map(item -> process(item)) // Fan-Out
.sequential()
.collectList() // Fan-In
.subscribe(System.out::println);
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Global Queue │───> │ Local Queues │───> │ Workers │
│ (优先级队列) │<─── │ (工作窃取) │<─── │ (弹性线程池) │
└──────────────┘ └──────────────┘ └──────────────┘
优势:
// Fan-Out速率
Counter.Builder("fanout_requests_total")
.labelNames("source")
.register(registry);
// Fan-In延迟直方图
Histogram.Builder("fanin_duration_seconds")
.buckets(0.1, 0.5, 1)
.register(registry);
指标 | 健康阈值 | Grafana表达式 |
---|---|---|
活跃线程数 | ≤核心线程数×1.5 | thread_pool_active_threads{} |
任务积压量 | ≤队列容量50% | thread_pool_queue_size{} |
工作窃取次数 | ≥100/分钟 | forkjoin_steals_total |
graph TD
A[需要严格顺序?] -->|是| B[选择手动控制方案]
A -->|否| C{任务类型?}
C -->|CPU密集型| D[ForkJoinPool]
C -->|IO密集型| E[CompletableFuture+自定义线程池]
C -->|混合型| F[分阶段差异化处理]
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有