前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊如何自定义parallelStream的线程池

聊聊如何自定义parallelStream的线程池

作者头像
code4it
发布于 2023-08-31 06:27:49
发布于 2023-08-31 06:27:49
62900
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下parallelStream怎么使用自定义的线程池

ForkJoinPool

java/util/concurrent/ForkJoinPool.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ForkJoinPool extends AbstractExecutorService {

    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

    private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = new DefaultCommonPoolForkJoinWorkerThreadFactory();
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

}

parallelStream默认使用的是common的ForkJoinPool,可以通过系统属性来设置parallelism等

ForkJoinPoolFactoryBean

org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean {

    private boolean commonPool = false;

    private int parallelism = Runtime.getRuntime().availableProcessors();

    private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;

    @Nullable
    private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;

    private boolean asyncMode = false;

    private int awaitTerminationSeconds = 0;

    @Nullable
    private ForkJoinPool forkJoinPool;

    //......

    @Override
    public void destroy() {
        if (this.forkJoinPool != null) {
            // Ignored for the common pool.
            this.forkJoinPool.shutdown();

            // Wait for all tasks to terminate - works for the common pool as well.
            if (this.awaitTerminationSeconds > 0) {
                try {
                    this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

}    

spring3.1提供了ForkJoinPoolFactoryBean,可以用于创建并托管forkJoinPool

示例

配置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class ForkJoinConfig {

    @Bean
    public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
        ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean();
        factoryBean.setCommonPool(false);
        // NOTE LIFO_QUEUE FOR working steal from tail of queue
        factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE
        factoryBean.setParallelism(10);
        // factoryBean.setUncaughtExceptionHandler();
        factoryBean.setAwaitTerminationSeconds(60);
        return factoryBean;
    }
}

使用

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @Autowired
    ForkJoinPoolFactoryBean forkJoinPoolFactoryBean;

    public void streamParallel() throws ExecutionException, InterruptedException {
        List<TodoTask> result = forkJoinPoolFactoryBean.getObject().submit(new Callable<List<TodoTask>>() {
            @Override
            public List<TodoTask> call() throws Exception {
                return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> {
                    log.info("thread:{}", Thread.currentThread().getName());
                    return new TodoTask(i, "name"+i);
                }).collect(Collectors.toList());
            }
        }).get();
        result.stream().forEach(System.out::println);
    }

common的workerName前缀为ForkJoinPool.commonPool-worker- 自定义的workerName前缀默认为ForkJoinPool- nextPoolId() -worker-

小结

parallelStream默认使用的是commonPool,是static代码块默认初始化,针对个别场景可以自定义ForkJoinPool,将parallelStream作为一个任务丢进去,这样子就不会影响默认的commonPool。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-07-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊如何自定义parallelStream的线程池
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
code4it
2023/07/08
1.3K0
高并发之——创建线程池居然有这么多方式...
作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址:
冰河
2020/10/29
2780
【小家java】Java线程池之---ForkJoinPool线程池的使用以及原理
Java 7 引入了一种新的并发框架—— Fork/Join Framework。同时引入了一种新的线程池:ForkJoinPool(ForkJoinPool.coomonPool)
YourBatman
2019/09/03
2.1K0
【小家java】Java线程池之---ForkJoinPool线程池的使用以及原理
ForkJoin看这篇就够了![通俗易懂]
分治法是计算机领域常用的算法中的其中一个,主要思想就是将将一个规模为N的问题,分解成K个规模较小的子问题,这些子问题相互独立且与原问题性质相同;求解出子问题的解,合并得到原问题的解。
全栈程序员站长
2022/11/05
9090
ForkJoin看这篇就够了![通俗易懂]
java线程池(五):ForkJoinPool源码分析之一(外部提交及worker执行过程)
在前文中介绍了如何使用ForkJoinPool和ForkJoin的一些基本原理。现在继续来分析ForkJoin,原本计划从源码开始分析。但是ForkJoinPool的源码太过复杂。后续得分好几部分来讲解。今天先做一个总体的介绍。
冬天里的懒猫
2020/09/27
2.9K0
java线程池(五):ForkJoinPool源码分析之一(外部提交及worker执行过程)
五种线程池的对比与使用
通过源码可以看出底层调用的是ThreadPoolExecutor方法,传入一个同步的阻塞队列实现缓存。
爱撸猫的杰
2020/01/08
1K0
阿里代码规约为什么不让使用Executors包装好线程池呢?
在Executors下主要有5个静态方法: 1. Executors.newWorkStealingPool JDK8引入,创建持有足够线程的线程池支持给定的并行度,并通过使用多个队列减少竞争,此构造方法把CPU数量设置为默认的并行度 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExce
袁新栋-jeff.yuan
2020/08/26
6030
阿里代码规约为什么不让使用Executors包装好线程池呢?
谈谈fork/join实现原理
害,又是一个炒冷饭的时间。fork/join是在jdk1.7中出现的一个并发工作包,其特点是可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。从而达到多线程分发任务,达到高效处理的目的。
烂猪皮
2021/09/02
6820
阿里的面试题带你认识ForkJoinPool
随着在硬件上多核处理器的发展和广泛使用,并发编程成为程序员必须掌握的一门技术,在面试中也经常考查面试者并发相关的知识。
狼王编程
2021/06/01
4940
阿里的面试题带你认识ForkJoinPool
JUC并行计算框架 Fork/Join 原理图文详解&代码示例
关键词:divide and conquer algorithm,work-stealing,WorkQueue
一个会写诗的程序员
2020/05/08
3.3K0
任务拆分计算利器 ForkJoin 框架玩法详解
从 JDK 1.7 开始,引入了一种新的 Fork/Join 线程池框架,它可以把一个大任务拆成多个小任务并行执行,最后汇总执行结果。
Java极客技术
2024/01/19
2270
任务拆分计算利器 ForkJoin 框架玩法详解
还有年味的文章,ForkJoinPool 大型图文现场
并发工具类我们已经讲了很多,这些工具类的「目标」是让我们只关注任务本身,并且忽视线程间合作细节,简化了并发编程难度的同时,也增加了很多安全性。工具类的对使用者的「目标」虽然一致,但每一个工具类本身都有它独特的应用场景,比如:
用户4172423
2021/02/25
6960
还有年味的文章,ForkJoinPool 大型图文现场
啥?用了并行流还更慢了
Java 8给大家带来了一个非常便捷的多线程工具:并行流,一改往日Java多线程繁琐的编程规范,只需要一行代码,就可以让一个多线程跑起来,似乎让很多人忘记了被多线程支配的恐惧,这篇文章给大家分享一个真实的生产故障,由于在消费消息的处理器中使用了Java 8的并行流,导致集群消费消息的能力急速下降,造成线上消息堆积,引发故障。可能有朋友会好奇,到底是什么场景让并行流起了反作用?
苦味代码
2021/03/11
5780
啥?用了并行流还更慢了
压测引发的思考——高并发用同步还是异步好?
最近616大促,公司的服务需要进行压力测试,使用了公司自己的压测平台。对生产机器进行了摘流量压测。由于服务都是查询的接口,也算是很好压测的。这篇文章大概描述压测过程过程,主要是压测出的问题的解决以及对ForkJoinPool学习和了解。 (标题党???????)
袁新栋-jeff.yuan
2021/12/07
8740
压测引发的思考——高并发用同步还是异步好?
大任务拆分,让并行嗨起来!
之前我们学习了线程池ThreadPoolExecutor,它通过对任务队列和线程的有效管理实现了对并发任务的处理。
程序员蜗牛
2024/02/20
3340
大任务拆分,让并行嗨起来!
异步任务编排神器CompletableFuture
但是当异步任务繁多并且复杂,任务间可能存在依赖关系时,Future接口变得不太好用
菜菜的后端私房菜
2024/08/15
3580
Fork/Join框架原理和使用探秘 顶
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。也是当前执行速度最快的并发框架。
算法之名
2019/08/26
1.1K0
Fork/Join框架原理和使用探秘
                                                                            顶
(juc系列)forkjoin框架源码学习
JUC系列提供的又一个线程池,采用分治思想,及工作窃取策略,能获得更高的并发性能.
呼延十
2021/10/18
4750
parallelStream的坑,不踩不知道,一踩吓一跳
很多同学喜欢使用lambda表达式,它允许你定义短小精悍的函数,体现你高超的编码水平。当然,这个功能在某些以代码行数来衡量工作量的公司来说,就比较吃亏一些。
xjjdog
2020/09/23
1.1K0
Fork/Join框架基本使用[通俗易懂]
ava.util.concurrent.ForkJoinPool由Java大师Doug Lea主持编写,它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。本文中对Fork/Join框架的讲解,基于JDK1.8+中的Fork/Join框架实现,参考的Fork/Join框架主要源代码也基于JDK1.8+。
全栈程序员站长
2022/07/22
3840
Fork/Join框架基本使用[通俗易懂]
推荐阅读
相关推荐
聊聊如何自定义parallelStream的线程池
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验