Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊如何自定义parallelStream的线程池

聊聊如何自定义parallelStream的线程池

原创
作者头像
code4it
发布于 2023-07-08 13:26:37
发布于 2023-07-08 13:26:37
1.3K0
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下parallelStream怎么使用自定义的线程池

ForkJoinPool

java/util/concurrent/ForkJoinPool.java

代码语言:txt
AI代码解释
复制
public class ForkJoinPool extends AbstractExecutorService {

    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

    private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = new DefaultCommonPoolForkJoinWorkerThreadFactory();
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

}

parallelStream默认使用的是common的ForkJoinPool,可以通过系统属性来设置parallelism等

ForkJoinPoolFactoryBean

org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java

代码语言:txt
AI代码解释
复制
public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean {

	private boolean commonPool = false;

	private int parallelism = Runtime.getRuntime().availableProcessors();

	private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;

	@Nullable
	private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;

	private boolean asyncMode = false;

	private int awaitTerminationSeconds = 0;

	@Nullable
	private ForkJoinPool forkJoinPool;

	//......

	@Override
	public void destroy() {
		if (this.forkJoinPool != null) {
			// Ignored for the common pool.
			this.forkJoinPool.shutdown();

			// Wait for all tasks to terminate - works for the common pool as well.
			if (this.awaitTerminationSeconds > 0) {
				try {
					this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS);
				}
				catch (InterruptedException ex) {
					Thread.currentThread().interrupt();
				}
			}
		}
	}

}	

spring3.1提供了ForkJoinPoolFactoryBean,可以用于创建并托管forkJoinPool

示例

配置

代码语言:txt
AI代码解释
复制
@Configuration
public class ForkJoinConfig {

    @Bean
    public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
        ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean();
        factoryBean.setCommonPool(false);
        // NOTE LIFO_QUEUE FOR working steal from tail of queue
        factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE
        factoryBean.setParallelism(10);
        // factoryBean.setUncaughtExceptionHandler();
        factoryBean.setAwaitTerminationSeconds(60);
        return factoryBean;
    }
}

使用

代码语言:txt
AI代码解释
复制
    @Autowired
    ForkJoinPoolFactoryBean forkJoinPoolFactoryBean;

    public void streamParallel() throws ExecutionException, InterruptedException {
        List<TodoTask> result = forkJoinPoolFactoryBean.getObject().submit(new Callable<List<TodoTask>>() {
            @Override
            public List<TodoTask> call() throws Exception {
                return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> {
                    log.info("thread:{}", Thread.currentThread().getName());
                    return new TodoTask(i, "name"+i);
                }).collect(Collectors.toList());
            }
        }).get();
        result.stream().forEach(System.out::println);
    }

common的workerName前缀为ForkJoinPool.commonPool-worker-

自定义的workerName前缀默认为ForkJoinPool- nextPoolId() -worker-

小结

parallelStream默认使用的是commonPool,是static代码块默认初始化,针对个别场景可以自定义ForkJoinPool,将parallelStream作为一个任务丢进去,这样子就不会影响默认的commonPool。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
腾讯的组织能力是什么?| 加速器·荐读
来源/作者:腾讯咨询 ---- 笔者按 有很多文章介绍阿里、华为、字节跳动的组织能力,但鲜有介绍腾讯,这可能和鹅厂低调务实的风格有关,喜欢做,不喜欢说。 因专注在组织能力咨询,浸泡鹅厂5年的我,经常思考「腾讯的组织能力是什么」。在「腾讯咨询」的身份也让我有相对抽离的位置,去观察腾讯和其他优秀大厂相比下的组织能力特色。 从「腾讯没有梦想」说起 我们就先从一篇非常有名的文章说起吧。2018年5月5日,一篇题为「腾讯没有梦想」的文章在腾讯的产品「微信」上广为流传。这篇文章最尖锐的批评是腾讯正在丧失产品
腾讯SaaS加速器
2020/08/13
1.5K0
专访Tony:漫谈技术、文化与人才
3月19日,Tony正式卸任公司执行董事,并将在6个月后卸任CTO职责,而后全面投身到学院专职讲师岗位。《腾讯月刊》在第一时间采访了Tony,Tony也围绕着技术、文化和人才话题,结合16年来的创业经历畅谈了他的看法。 《腾讯月刊》:尽管我们看到了公司的公告、还有Tony给大家的一封信,但是,还是想问一下Tony,腾讯是一个很大的事业平台,你本身又是创始人,为什么要卸任CTO这个岗位,除了邮件里说的,有没有需要补充的? Tony:从27岁开始,我投入了公司16年的创业和发展,这是一个蛮长的时间。作为一家企业
腾讯大讲堂
2018/02/09
8090
乐问如何让鹅厂更有温度?这18个回答告诉你
2020,我们一起度过了极不寻常的一年,但好在无论多少变动正在发生,还有些东西始终不变。正是这些不变,让我们相信着前方总是美好,让我们发现温暖依然在身边。腾讯人每天都上的乐问,记录下了这些珍贵的“不变”。 乐问记录了鹅厂人的生活趣事,沉淀了无数条产品建议,让几万人有了一个跨部门、跨层级沟通的桥梁。在乐问,你能看到最真实最包容的腾讯文化。 倒带2020,让我们来看看这一年,腾讯人在乐问上最有温度的18句话,以及这些金句背后那个接地气、有底蕴的腾讯。 乐问有温度、有温情,这里有公司对员工的关怀,有员工之间
腾讯乐享
2021/01/08
3.6K0
腾讯乐享背后的女人们
她基本总是最早来到公司,除了桌上永远都有三、四杯咖啡,还铺满了打印好的用户需求,商务经理们还要手写大字报,随时“威胁”她!
腾讯乐享
2019/03/12
1.9K0
腾讯乐享背后的女人们
腾讯人才听得懂的12个“鹅语”
瑞雪与活水,一对姐妹花,演尽了H2O的美。 但你可能不知道,在“鹅言鹅语”里,许多话还有另一种美: 瑞雪 瑞雪兆的不止是丰年。在鹅厂,瑞雪代表一种精神。 小到电梯不逆行、班车不插队、食堂不占座,见微知著。大到见义勇为、跳海救人都不在话下。比如,这位。 活水 人在岗位A,期待岗位B,活水一下,让职业生涯流入下一种可能的内部转岗制度。 划重点,第一,活水信息全程保密,即使流动不成功也不会压力山大;第二,一旦面试成功,原部门必须在一定期限内“放人”。 每一只有才华的鹅,都有寻找新舞台的自由。
腾讯大讲堂
2019/05/16
4.3K0
腾讯人才听得懂的12个“鹅语”
腾讯在命运的棋盘上砸下一颗钉子
浅友们好~我是史中,我的日常生活是开撩五湖四海的科技大牛,我会尝试各种姿势,把他们的无边脑洞和温情故事讲给你听。如果你想和我做朋友,不妨加微信(shizhongmax)。
浅黑科技
2022/11/11
1.1K0
腾讯在命运的棋盘上砸下一颗钉子
拼业务办比赛立flag,“乐问”还能这么玩儿?
2020年第一个工作日,也是传统节日腊八节,乐乐祝各位节日快乐呀! 跨年夜时,好友们都在朋友圈发表“年终总结”。而在鹅厂,鹅们选择在乐享的乐问上告别这一年。 #他们晒2019年最有意义的照片# #他们一句话/一个词形容2019# 乐问是鹅厂乐享上最热的应用,提倡你问我答、扁平沟通,在这里,鹅们可以反馈产品问题、吐槽运营策略,可以抒发生活焦虑、咨询情感困惑,甚至可以质疑公司战略,经过多年沉淀,已经成为腾讯人的一种生活方式。 当乐享对外服务各行各业,我们渐渐发现不是所有企业的乐问都会像腾讯这
腾讯乐享
2020/01/13
1.3K0
拼业务办比赛立flag,“乐问”还能这么玩儿?
腾讯乐享社区是什么?如何使用腾讯乐享?如何连接腾讯乐享和企业微信?
作为鹅厂内部人,几乎每天都会打开腾讯乐享。腾讯乐享社区平台可是号称“内部第一流量池”,鹅厂员工会在里面分享知识、组织活动、学习提升,是交流工作生活心得的宝地。腾讯乐享SaaS产品已对外开放5年,服务20万家企业客户,是腾讯千帆的甄选产品伙伴。
阿那个沫
2022/08/23
4K0
腾讯乐享社区是什么?如何使用腾讯乐享?如何连接腾讯乐享和企业微信?
鹅厂7年终有离开之日,记离职鹅厂最后30天的真实心路历程
本文来自公众号“玩不好就别玩”原创分享。本次文章内容为个人真实经历,记录了作者个人离职鹅厂前最后一个月工作交接过程中的心理变化历程。内容虽平凡无奇,但同为程序员的你我,感同身受。
JackJiang
2019/03/01
1.5K0
在腾讯,能学到什么
说来也巧,作者当初加入腾讯的时候,也正是我在腾讯上班的那段时间,转眼间,已是10+年过去。
程序猿石头
2021/04/02
5470
又见一片星空——从外企技术管理者到公益组织创始人
这是一个关于职业旅途的故事,从研究火箭的工程师,到外企白骨精,再到乡村女教师......
叶锦鲤
2018/07/26
5430
又见一片星空——从外企技术管理者到公益组织创始人
520,来听腾讯乐享和客户的成长故事
(乐乐提醒:文末有奖告白活动,记得参与哦~) 2017年的那个夏天,腾讯乐享走出鹅厂,第一次和大家见面。现在,我们和数十万行业客户一起,即将迈入第四个年头。 在2021.5.20这个格外甜蜜的日子,腾讯乐享正式推出全新价值主张—— ●享智慧,快成长 ● 腾讯乐享希望陪伴每一位客户,连接智慧,实现人才成长、组织成长。成长路上,乐享相伴,这就是腾讯乐享对每一位客户的真情表白。 腾讯乐享是如何陪伴不同行业的客户,一起享智慧、快成长的呢? 和富途一起打造知识型企业 富途成立之初,就以打造知识型企业为目
腾讯乐享
2021/05/21
1.1K0
来自腾讯网管专家的鸡汤,了解一下?
"鹅厂网事"由深圳市腾讯计算机系统有限公司技术工程事业群网络平台部运营,我们希望与业界各位志同道合的伙伴交流切磋最新的网络、服务器行业动态信息,同时分享腾讯在网络与服务器领域,规划、运营、研发、服务等层面的实战干货,期待与您的共同成长。 网络平台部以构建敏捷、弹性、低成本的业界领先海量互联网云计算服务平台,为支撑腾讯公司业务持续发展,为业务建立竞争优势、构建行业健康生态而持续贡献价值! Cesczhang 腾讯技术研发类领域专家,2006年硕士毕业后加入腾讯,一直从事网管平台领域的研发建设,当前主要负责Sn
鹅厂网事
2018/04/27
1.3K0
中国SaaS公司的互联网特性
来源:SaaS白夜行|作者:吴昊SaaS ---- 前一阵我被问到:SaaS公司是软件公司、还是互联网公司。 我后来想了想——认为自己是软件公司的,就是软件公司;认为自己是互联网公司的就是互联网公司。 —— 思路决定出路。 今天我们就聊聊,在中国,SaaS公司有哪些区别于传统OP(On-Premises)软件公司的互联网特性? 从产品到市场、销售、实施、服务的价值链条,咱们倒着看。 服务:持续性 “SaaS的本质是续费”。 大家看看,淘宝、微信、百度......哪家典型的to C互联网产品
腾讯SaaS加速器
2021/09/26
6770
腾讯这个内网,很多企业想直接搬走
现在,每一位销售、架构师新员工入职,都会先上两门必修课:基础产品学堂、面客行为标准化。
小腾资讯君
2024/04/07
3120
如何正确看待LeCun工作调整?听听FAIR研究员们现身说法
李根 发自 凹非寺 量子位 报道 | 公众号 QbitAI LeCun离任的余波还未消停。 1月24日(大前天),深度学习三巨头之一的Yann LeCun卸任Facebook AI Researc
量子位
2018/03/21
9160
如何正确看待LeCun工作调整?听听FAIR研究员们现身说法
2021 腾讯技术十大热门文章
今天是 2021 年的最后一天,相信这依然是让我们每个人都印象深刻的一年。全年我们一共发布了 130+ 篇头条技术干货文章,阅读量超过百万。 2021腾讯技术工程文章关键词 这里我们将年度十大热门文章梳理了出来,作为一份小小的新年礼物分享给各位,祝大家虎年快乐~ 以下文章点击图片即可跳转 1、最近大火的「元宇宙」是什么? 摘要:本文介绍了元宇宙的由来和底层技术,探讨海内外资本在这条赛道上的布局,元宇宙将会对哪些行业产生变革的影响,这些影响背后凸显了元宇宙的哪些价值,以及元宇宙逐步实现的过程中监
腾讯技术工程官方号
2021/12/31
1.3K0
程序员黑话
一、招聘行话大全,能听懂证明你是历经磨难的老司机 刚开始投简历时,你总以为是这样的,其实大部分情况下是那样的…… 面试之后,HR让回去等消息,傻傻的等待,半个月以上没有回音,各种焦虑…… 明面上的意思
架构师小秘圈
2018/04/02
2K0
程序员黑话
【腾讯安全峰会】7大BG,9年沉淀,只为一个腾讯梦
经过台风“海马”的一轮肆虐后,这几天深圳的天气格外晴朗。今天,由腾讯安全平台部、腾讯学院举办的“第九届腾讯安全技术峰会”正式揭开了序幕。 说起这次峰会还有个小插曲:由于上周五台风对深圳的正面袭击,峰会改期到了今天。小伙伴们都惊呼:这可真是应了这次峰会的主题——“风起”!那么,这一波“大风”吹来了什么?这场持续了9年的企鹅聚会今年讲什么?现在就随现场小编一起来看看~ 风云际会 7大BG共同吹响安全集结号 一进入会场,就发现峰会的签到环节存在不少巧思。参会嘉宾刷卡签到后,自己的头像就会闪现在大屏幕上,来自7
TEG云端专业号
2018/03/13
4.5K0
【腾讯安全峰会】7大BG,9年沉淀,只为一个腾讯梦
腾讯产品总监:为何我工作10年,内心仍无比恐慌?(强烈推荐)
很开心今天能够站在这里,做为馒头商学院开学典礼的第一位分享嘉宾。今天这个分享主题想过很久,做为早晨的第一课应该给大家醒醒脑,所以我的题目是《产品经理们,五年后,你会失业吗?》。 讲课之前我先做个小互动,在座各位,是90后的请举手?恩,大概占到三分之一人数,80后举手?剩下都是80后,好开心,我和大多数人是一样的。另外,这个话题对于大多数的你们,绝对有意义。 我从业到现在已经十年多了,这个问题是我从业五到七八年时最苦恼的话题。当时觉得这个世界太不友好了,互联网这个行业全是年轻人,30岁以上就压力非常大,我缓
前朝楚水
2018/04/04
3K0
腾讯产品总监:为何我工作10年,内心仍无比恐慌?(强烈推荐)
推荐阅读
相关推荐
腾讯的组织能力是什么?| 加速器·荐读
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档