Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊如何自定义parallelStream的线程池

聊聊如何自定义parallelStream的线程池

原创
作者头像
code4it
发布于 2023-07-08 13:26:37
发布于 2023-07-08 13:26:37
1.3K0
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下parallelStream怎么使用自定义的线程池

ForkJoinPool

java/util/concurrent/ForkJoinPool.java

代码语言:txt
AI代码解释
复制
public class ForkJoinPool extends AbstractExecutorService {

    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

    private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = new DefaultCommonPoolForkJoinWorkerThreadFactory();
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

}

parallelStream默认使用的是common的ForkJoinPool,可以通过系统属性来设置parallelism等

ForkJoinPoolFactoryBean

org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java

代码语言:txt
AI代码解释
复制
public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean {

	private boolean commonPool = false;

	private int parallelism = Runtime.getRuntime().availableProcessors();

	private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;

	@Nullable
	private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;

	private boolean asyncMode = false;

	private int awaitTerminationSeconds = 0;

	@Nullable
	private ForkJoinPool forkJoinPool;

	//......

	@Override
	public void destroy() {
		if (this.forkJoinPool != null) {
			// Ignored for the common pool.
			this.forkJoinPool.shutdown();

			// Wait for all tasks to terminate - works for the common pool as well.
			if (this.awaitTerminationSeconds > 0) {
				try {
					this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS);
				}
				catch (InterruptedException ex) {
					Thread.currentThread().interrupt();
				}
			}
		}
	}

}	

spring3.1提供了ForkJoinPoolFactoryBean,可以用于创建并托管forkJoinPool

示例

配置

代码语言:txt
AI代码解释
复制
@Configuration
public class ForkJoinConfig {

    @Bean
    public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
        ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean();
        factoryBean.setCommonPool(false);
        // NOTE LIFO_QUEUE FOR working steal from tail of queue
        factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE
        factoryBean.setParallelism(10);
        // factoryBean.setUncaughtExceptionHandler();
        factoryBean.setAwaitTerminationSeconds(60);
        return factoryBean;
    }
}

使用

代码语言:txt
AI代码解释
复制
    @Autowired
    ForkJoinPoolFactoryBean forkJoinPoolFactoryBean;

    public void streamParallel() throws ExecutionException, InterruptedException {
        List<TodoTask> result = forkJoinPoolFactoryBean.getObject().submit(new Callable<List<TodoTask>>() {
            @Override
            public List<TodoTask> call() throws Exception {
                return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> {
                    log.info("thread:{}", Thread.currentThread().getName());
                    return new TodoTask(i, "name"+i);
                }).collect(Collectors.toList());
            }
        }).get();
        result.stream().forEach(System.out::println);
    }

common的workerName前缀为ForkJoinPool.commonPool-worker-

自定义的workerName前缀默认为ForkJoinPool- nextPoolId() -worker-

小结

parallelStream默认使用的是commonPool,是static代码块默认初始化,针对个别场景可以自定义ForkJoinPool,将parallelStream作为一个任务丢进去,这样子就不会影响默认的commonPool。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊如何自定义parallelStream的线程池
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
code4it
2023/08/31
6220
聊聊如何自定义parallelStream的线程池
高并发之——创建线程池居然有这么多方式...
作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址:
冰河
2020/10/29
2780
输了!广州某小厂一面,也凉了
之前「后端面经」系列已经分享过北京、深圳、上海等地方的小厂面经,有同学想看看广州的。
小林coding
2024/05/28
2100
输了!广州某小厂一面,也凉了
java线程池(五):ForkJoinPool源码分析之一(外部提交及worker执行过程)
在前文中介绍了如何使用ForkJoinPool和ForkJoin的一些基本原理。现在继续来分析ForkJoin,原本计划从源码开始分析。但是ForkJoinPool的源码太过复杂。后续得分好几部分来讲解。今天先做一个总体的介绍。
冬天里的懒猫
2020/09/27
2.8K0
java线程池(五):ForkJoinPool源码分析之一(外部提交及worker执行过程)
parallelStream的坑,不踩不知道,一踩吓一跳
很多同学喜欢使用lambda表达式,它允许你定义短小精悍的函数,体现你高超的编码水平。当然,这个功能在某些以代码行数来衡量工作量的公司来说,就比较吃亏一些。
xjjdog
2020/09/23
1.1K0
五种线程池的对比与使用
通过源码可以看出底层调用的是ThreadPoolExecutor方法,传入一个同步的阻塞队列实现缓存。
爱撸猫的杰
2020/01/08
1K0
ForkJoin看这篇就够了![通俗易懂]
分治法是计算机领域常用的算法中的其中一个,主要思想就是将将一个规模为N的问题,分解成K个规模较小的子问题,这些子问题相互独立且与原问题性质相同;求解出子问题的解,合并得到原问题的解。
全栈程序员站长
2022/11/05
9040
ForkJoin看这篇就够了![通俗易懂]
阿里代码规约为什么不让使用Executors包装好线程池呢?
在Executors下主要有5个静态方法: 1. Executors.newWorkStealingPool JDK8引入,创建持有足够线程的线程池支持给定的并行度,并通过使用多个队列减少竞争,此构造方法把CPU数量设置为默认的并行度 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExce
袁新栋-jeff.yuan
2020/08/26
6000
阿里代码规约为什么不让使用Executors包装好线程池呢?
还有年味的文章,ForkJoinPool 大型图文现场
并发工具类我们已经讲了很多,这些工具类的「目标」是让我们只关注任务本身,并且忽视线程间合作细节,简化了并发编程难度的同时,也增加了很多安全性。工具类的对使用者的「目标」虽然一致,但每一个工具类本身都有它独特的应用场景,比如:
用户4172423
2021/02/25
6910
还有年味的文章,ForkJoinPool 大型图文现场
谈谈fork/join实现原理
害,又是一个炒冷饭的时间。fork/join是在jdk1.7中出现的一个并发工作包,其特点是可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。从而达到多线程分发任务,达到高效处理的目的。
烂猪皮
2021/09/02
6760
Fork/Join框架基本使用[通俗易懂]
ava.util.concurrent.ForkJoinPool由Java大师Doug Lea主持编写,它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。本文中对Fork/Join框架的讲解,基于JDK1.8+中的Fork/Join框架实现,参考的Fork/Join框架主要源代码也基于JDK1.8+。
全栈程序员站长
2022/07/22
3780
Fork/Join框架基本使用[通俗易懂]
压测引发的思考——高并发用同步还是异步好?
最近616大促,公司的服务需要进行压力测试,使用了公司自己的压测平台。对生产机器进行了摘流量压测。由于服务都是查询的接口,也算是很好压测的。这篇文章大概描述压测过程过程,主要是压测出的问题的解决以及对ForkJoinPool学习和了解。 (标题党???????)
袁新栋-jeff.yuan
2021/12/07
8650
压测引发的思考——高并发用同步还是异步好?
阿里的面试题带你认识ForkJoinPool
随着在硬件上多核处理器的发展和广泛使用,并发编程成为程序员必须掌握的一门技术,在面试中也经常考查面试者并发相关的知识。
狼王编程
2021/06/01
4850
阿里的面试题带你认识ForkJoinPool
任务拆分计算利器 ForkJoin 框架玩法详解
从 JDK 1.7 开始,引入了一种新的 Fork/Join 线程池框架,它可以把一个大任务拆成多个小任务并行执行,最后汇总执行结果。
Java极客技术
2024/01/19
2190
任务拆分计算利器 ForkJoin 框架玩法详解
【小家java】Java线程池之---ForkJoinPool线程池的使用以及原理
Java 7 引入了一种新的并发框架—— Fork/Join Framework。同时引入了一种新的线程池:ForkJoinPool(ForkJoinPool.coomonPool)
YourBatman
2019/09/03
2.1K0
【小家java】Java线程池之---ForkJoinPool线程池的使用以及原理
JUC并行计算框架 Fork/Join 原理图文详解&代码示例
关键词:divide and conquer algorithm,work-stealing,WorkQueue
一个会写诗的程序员
2020/05/08
3.2K0
大任务拆分,让并行嗨起来!
之前我们学习了线程池ThreadPoolExecutor,它通过对任务队列和线程的有效管理实现了对并发任务的处理。
程序员蜗牛
2024/02/20
3300
大任务拆分,让并行嗨起来!
ForkJoinPool使用及原理
Fork/Join 框架是 Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
leobhao
2022/06/28
1.1K0
ForkJoinPool使用及原理
IO密集型任务使用Java的parallelStream并行流,提高性能及隔离故障,如何自定义线程池
在Java中,parallelStream 是 Java 8 引入的 Stream API 的一部分,它允许并行处理集合中的元素。默认情况下,parallelStream 共享使用默认的 ForkJoinPool 作为其线程池,可能对你的业务影响性能,而且起不到隔离的作用。所以我们需要自定义其使用的线程池。
崔认知
2024/09/13
7280
IO密集型任务使用Java的parallelStream并行流,提高性能及隔离故障,如何自定义线程池
java线程池(六):ForkJoinPool源码分析之二(WorkQueue源码)
在前面介绍了ForkJoinPool的骨架源码之后,我们来看看ForkJoinPool的核心组成。WorkQueue的源码。
冬天里的懒猫
2020/09/28
8120
java线程池(六):ForkJoinPool源码分析之二(WorkQueue源码)
推荐阅读
相关推荐
聊聊如何自定义parallelStream的线程池
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档