前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >并发设计模式实战系列(20):扇出/扇入模式(Fan-Out/Fan-In)(完结篇)

并发设计模式实战系列(20):扇出/扇入模式(Fan-Out/Fan-In)(完结篇)

作者头像
摘星.
发布于 2025-05-20 07:11:12
发布于 2025-05-20 07:11:12
12100
代码可运行
举报
文章被收录于专栏:博客专享博客专享
运行总次数:0
代码可运行
🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第二十章扇出/扇入模式(Fan-Out/Fan-In),废话不多说直接开始~

一、核心原理深度拆解

1. 数据流拓扑结构
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Producer   │ ──> │  Workers    │ ──> │  Collector  │
 (Main Task) (并行处理) (结果聚合)   │
└─────────────┘     └─────────────┘     └─────────────┘
      │                   ▲                   ▲
      └───────────────────┴───────────────────┘
            Fan-Out                  Fan-In
  • Fan-Out阶段:主任务将工作拆分为多个子任务,分发给Worker线程并行处理
  • Fan-In阶段:Worker处理结果通过通道返回,由Collector统一聚合
2. 并发控制关键点
  • 工作窃取(Work Stealing):使用ForkJoinPool实现负载均衡
  • 背压控制(Backpressure):通过有界队列防止内存溢出
  • 结果排序:若需保持顺序,需使用CompletionService按完成顺序获取

二、生活化类比:快餐店厨房

系统组件

现实类比

核心行为

Producer

订单接收台

将套餐拆解为汉堡、薯条等单品

Workers

多个厨师工作站

并行制作不同食物组件

Collector

装配台

将完成组件组合成完整套餐

  • 效率提升:3个厨师同时做汉堡/薯条/饮料(Fan-Out) → 装配员组合(Fan-In)
  • 异常处理:某个厨师生病时,其他厨师可分担其任务

三、Java代码实现(生产级Demo)

1. 完整可运行代码
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class FanOutFanInDemo {

    // 模拟CPU密集型任务
    static int processItem(int item) {
        try {
            Thread.sleep(ThreadLocalRandom.current().nextInt(100));
        } catch (InterruptedException e) {}
        return item * item; // 平方计算
    }

    public static void main(String[] args) throws Exception {
        List<Integer> inputs = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
        
        // 方法1:使用CompletableFuture(自动Fan-In)
        System.out.println("=== CompletableFuture方案 ===");
        long start = System.currentTimeMillis();
        CompletableFuture<Void> future = CompletableFuture.allOf(
            inputs.stream()
                .map(item -> CompletableFuture.supplyAsync(() -> processItem(item)))
                .toArray(CompletableFuture[]::new)
        ).thenApply(ignored -> {
            System.out.println("所有任务完成");
            return null;
        });
        future.get();
        System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);

        // 方法2:使用ForkJoinPool(工作窃取)
        System.out.println("\n=== ForkJoinPool方案 ===");
        ForkJoinPool pool = new ForkJoinPool(4);
        start = System.currentTimeMillis();
        List<Integer> results = pool.submit(() -> 
            inputs.parallelStream()
                .map(FanOutFanInDemo::processItem)
                .collect(Collectors.toList())
        ).get();
        System.out.println("结果数量: " + results.size());
        System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);

        // 方法3:手动控制(精确管理)
        System.out.println("\n=== 手动控制方案 ===");
        ExecutorService workers = Executors.newFixedThreadPool(4);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(workers);
        start = System.currentTimeMillis();
        
        // Fan-Out阶段
        inputs.forEach(item -> completionService.submit(() -> processItem(item)));
        
        // Fan-In阶段
        List<Integer> orderedResults = new ArrayList<>(inputs.size());
        for (int i = 0; i < inputs.size(); i++) {
            try {
                orderedResults.add(completionService.take().get());
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        System.out.println("结果数量: " + orderedResults.size());
        System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);
        workers.shutdown();
    }
}
2. 关键配置说明
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 1. CompletableFuture默认使用ForkJoinPool.commonPool()
// 可指定自定义线程池:
CompletableFuture.supplyAsync(() -> task, customPool);

// 2. ForkJoinPool参数选择:
// - 并行度通常设为CPU核心数
new ForkJoinPool(Runtime.getRuntime().availableProcessors());

// 3. 手动方案背压控制:
// 使用有界队列+CallerRunsPolicy
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(100), 
    new ThreadPoolExecutor.CallerRunsPolicy());

四、横向对比表格

1. 不同实现方案对比

方案

优点

缺点

适用场景

CompletableFuture

声明式编程,自动聚合

难以精确控制线程

简单并行任务

ForkJoinPool

工作窃取算法效率高

适合CPU密集型

递归分治任务

手动控制

完全控制流程和资源

代码复杂度高

需要严格顺序/背压

2. 性能影响因素

因素

影响程度

优化建议

任务粒度

★★★★

每个任务10ms-100ms为宜

线程池大小

★★★

IO密集型:大线程池 CPU密集型:小线程池

结果传输开销

★★

避免在Worker和Collector间传大对象

任务倾斜

★★

使用工作窃取线程池


五、高级优化技巧

1. 动态批次处理
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 根据系统负载调整批次大小
int batchSize = Runtime.getRuntime().availableProcessors() * 2;
List<List<Integer>> batches = Lists.partition(inputs, batchSize);
2. 错误处理增强
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 为CompletableFuture添加异常处理
future.exceptionally(ex -> {
    System.err.println("任务失败: " + ex.getMessage());
    return null;
});
3. 混合模式
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// IO密集型 + CPU密集型混合
ExecutorService ioPool = Executors.newCachedThreadPool();
ExecutorService cpuPool = Executors.newWorkStealingPool();

CompletableFuture.supplyAsync(() -> queryDB(), ioPool)
    .thenApplyAsync(data -> processData(data), cpuPool);
4. 监控指标
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// ForkJoinPool监控
ForkJoinPool pool = (ForkJoinPool) ForkJoinPool.commonPool();
System.out.println("活跃线程数: " + pool.getActiveThreadCount());
System.out.println("窃取任务数: " + pool.getStealCount());


六、扇出/扇入模式变体与扩展模式

1. 分阶段扇出(Pipeline Fan-Out)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
┌─────────┐     ┌─────────┐     ┌─────────┐  
│ Stage 1  │───> │ Stage 2  │───> │ Stage 3 (Fan-Out)<───  (Fan-Out)<───  (Fan-In) │  
└─────────┘     └─────────┘     └─────────┘  

特点

  • 每个阶段既是上一阶段的消费者,又是下一阶段的生产者
  • 适用于多级处理流水线(如ETL场景)

Java实现

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 使用Phaser协调多阶段  
Phaser phaser = new Phaser(3);  
ExecutorService[] stagePools = {Executors.newFixedThreadPool(4), ...};  

// Stage 1  
stagePools[0].submit(() -> {  
    List<Data> stage1Results = dataList.parallelStream()  
        .map(d -> processStage1(d)).collect(Collectors.toList());  
    phaser.arriveAndAwaitAdvance();  
    return stage1Results;  
});  

// Stage 2同理...  
2. 动态扇出(Dynamic Forking)

场景:根据运行时条件决定分支数量

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CompletableFuture<?>[] futures = inputList.stream()  
    .map(input -> {  
        if (needFork(input)) {  
            return CompletableFuture.allOf(  
                taskA(input),  
                taskB(input)  
            );  
        } else {  
            return taskC(input);  
        }  
    }).toArray(CompletableFuture[]::new);  

七、生产环境问题解决方案

1. 死锁预防矩阵

风险点

检测方法

解决方案

Fan-Out过度

监控线程池队列堆积

限制最大分片数量(如Guava的RateLimiter)

Fan-In阻塞

线程dump显示Collector阻塞

使用CompletionService.poll(timeout)

资源竞争

JFR显示高锁竞争

采用无锁队列(ConcurrentLinkedQueue)

结果顺序错乱

业务校验失败

使用AtomicInteger序号标记原始顺序

2. 性能调优检查表
  1. Fan-Out阶段
    • 分片大小是否大于CPU核心数×2
    • 避免在分片逻辑中执行阻塞IO
  1. Worker阶段
    • 每个任务耗时是否在50-500ms黄金区间
    • 是否避免修改共享状态
  1. Fan-In阶段
    • 聚合操作时间复杂度是否优于O(n²)
    • 是否使用并发集合(如ConcurrentHashMap)

八、与其他模式的组合应用

1. 反应式编程整合
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Reactor + Fan-Out  
Flux.fromIterable(inputList)  
    .parallel()  
    .runOn(Schedulers.parallel())  
    .map(item -> process(item))  // Fan-Out  
    .sequential()  
    .collectList()               // Fan-In  
    .subscribe(System.out::println);  
2. 与生产者-消费者模式结合
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
┌──────────────┐     ┌──────────────┐     ┌──────────────┐  
│ Global Queue  │───> │ Local Queues │───> │ Workers      │  
 (优先级队列)<───  (工作窃取)<───  (弹性线程池)  │  
└──────────────┘     └──────────────┘     └──────────────┘  

优势

  • 全局队列解决跨节点负载均衡
  • 本地队列减少锁竞争

九、可视化监控方案

1. Prometheus监控指标
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Fan-Out速率  
Counter.Builder("fanout_requests_total")  
    .labelNames("source")  
    .register(registry);  

// Fan-In延迟直方图  
Histogram.Builder("fanin_duration_seconds")  
    .buckets(0.1, 0.5, 1)  
    .register(registry);  
2. 线程池监控看板

指标

健康阈值

Grafana表达式

活跃线程数

≤核心线程数×1.5

thread_pool_active_threads{}

任务积压量

≤队列容量50%

thread_pool_queue_size{}

工作窃取次数

≥100/分钟

forkjoin_steals_total


十、模式选择决策树

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
graph TD  
    A[需要严格顺序?] -->|| B[选择手动控制方案]  
    A -->|| C{任务类型?}  
    C -->|CPU密集型| D[ForkJoinPool]  
    C -->|IO密集型| E[CompletableFuture+自定义线程池]  
    C -->|混合型| F[分阶段差异化处理]  
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-05-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
并发设计模式实战系列(16):屏障(Barrier)
今天为大家带来的是并发设计模式实战系列,第十六章屏障(Barrier),废话不多说直接开始~
摘星.
2025/05/20
1370
并发设计模式实战系列(1):半同步/半异步模式
今天为大家带来的是并发设计模式实战系列,第一章半同步/半异步(Half-Sync/Half-Async)模式,废话不多说直接开始~
摘星.
2025/05/20
840
【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比
开启线程执行任务,不管是使用Runnable(无返回值不支持上报异常)还是Callable(有返回值支持上报异常)接口,都可以轻松实现。那么如果是开启线程池并需要获取结果归集的情况下,如何实现,以及优劣?
YourBatman
2019/09/03
2.7K1
【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比
并发设计模式实战系列(2):领导者/追随者模式
今天为大家带来的是并发设计模式实战系列,第二章领导者/追随者(Leader/Followers)模式,废话不多说直接开始~
摘星.
2025/05/20
550
《Java面试题集中营》- Java并发
Jdk提供了stop()方法用于强制停止线程,但官方并不建议使用,因为强制停止线程会导致线程使用的资源,比如文件描述符、网络连接处于不正常的状态。建议使用标志位的方式来终止线程,如果线程中有使用无限期的阻塞方式,比如wait()没有设置超时时间,就只能使用interrupt()方法来终止线程
阿提说说
2024/07/14
1120
并发设计模式实战系列(4):线程池
今天为大家带来的是并发设计模式实战系列,第四章线程池(Thread Pool)​,废话不多说直接开始~
摘星.
2025/05/20
1520
并发设计模式实战系列(3):工作队列
今天为大家带来的是并发设计模式实战系列,第三章工作队列(Work Queue)​​,废话不多说直接开始~
摘星.
2025/05/20
1570
并发设计模式实战系列(19):监视器(Monitor)
今天为大家带来的是并发设计模式实战系列,第十九章监视器(Monitor),废话不多说直接开始~
摘星.
2025/05/20
970
并发设计模式实战系列(9):消息传递(Message Passing)
今天为大家带来的是并发设计模式实战系列,第九章消息传递(Message Passing),废话不多说直接开始~
摘星.
2025/05/20
1290
并发设计模式实战系列(9):消息传递(Message Passing)
并行化:你的高并发大杀器
想必热爱游戏的同学小时候,都幻想过要是自己要是能像鸣人那样会多重影分身之术,就能一边打游戏一边上课了,可惜漫画就是漫画,现实中并没有这个技术,你要么只有老老实实的上课,要么就只有逃课去打游戏了。虽然在现实中我们无法实现多重影分身这样的技术,但是我们可以在计算机世界中实现我们这样的愿望。
程序猿DD
2018/10/26
6340
Java并行流陷阱:为什么指定线程池可能是个坏主意
Java Stream 并不支持指定线程池,实际编码中,有些开发者可能会使用一些“技巧”来指定线程池。实际上,所谓的技巧不仅降低了可读性,而且很容易出现bug。本文将分析并行流式编程的设计思想、”技巧“会带来的问题,并提出相关的解决方案。
程序猿川子
2024/11/13
1730
Java并行流陷阱:为什么指定线程池可能是个坏主意
快速上手JUC下常见并发容器
多线程环境下Java提供的一些简单容器都无法使用了,此时要用到JUC中的容器,由于 ConcurrentHashMap 是高频考点,用到也比较多因此着重写过了,其余的容器就看今天咯。
sowhat1412
2020/12/14
7750
快速上手JUC下常见并发容器
并发设计模式实战系列(8):Active Object
今天为大家带来的是并发设计模式实战系列,第8章Active Object,废话不多说直接开始~
摘星.
2025/05/20
1350
并发设计模式实战系列(6):读写锁
今天为大家带来的是并发设计模式实战系列,第六章读写锁模式​​,废话不多说直接开始~
摘星.
2025/05/20
930
并发设计模式实战系列(6):读写锁
多线程进阶--JUC并发编程
https://blog.csdn.net/weixin_44502509/article/details/106872957
hhss
2021/02/12
6440
多线程进阶--JUC并发编程
杰哥教你面试之一百问系列:java中高级多线程concurrent的使用
提到多线程,当然要熟悉java提供的各种多线程相关的并发包了,而java.util.concurrent就是最最经常会使用到的,那么关于concurrent的面试题目有哪些呢?一起来看看吧。
程序那些事
2023/09/07
4060
成果被他人窃取_工作窃取模式
ForkJoin(分支合并)是jdk1.7之后出来的,并行执行任务,提高效率,用在大数据量场景下。
全栈程序员站长
2022/09/30
3630
成果被他人窃取_工作窃取模式
04-CompletableFuture异步线程 性能
小白和他的朋友门,连续输了10几把游戏, 决定去餐厅吃饭了,3个人,直接点了10盘菜,决定化悲愤为食量
彼岸舞
2022/05/10
3420
死磕 java线程系列之ForkJoinPool深入解析
随着在硬件上多核处理器的发展和广泛使用,并发编程成为程序员必须掌握的一门技术,在面试中也经常考查面试者并发相关的知识。
彤哥
2019/11/14
7210
创建线程的 8 种方法
无论是为了提高程序运行效率,还是为了处理复杂的并发任务,我们都需要在代码中使用线程。
苏三说技术
2024/12/30
4220
创建线程的 8 种方法
推荐阅读
相关推荐
并发设计模式实战系列(16):屏障(Barrier)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验