Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >IO密集型任务使用Java的parallelStream并行流,提高性能及隔离故障,如何自定义线程池

IO密集型任务使用Java的parallelStream并行流,提高性能及隔离故障,如何自定义线程池

作者头像
崔认知
发布于 2024-09-13 05:55:45
发布于 2024-09-13 05:55:45
87902
代码可运行
举报
文章被收录于专栏:nobodynobody
运行总次数:2
代码可运行

Java中,parallelStream 是 Java 8 引入的 Stream API 的一部分,它允许并行处理集合中的元素。默认情况下,parallelStream 共享使用默认的 ForkJoinPool 作为其线程池,可能对你的业务影响性能,而且起不到隔离的作用。所以我们需要自定义其使用的线程池。

下面列出几种方法设置线程池:

一、设置系统属性:java.util.concurrent.ForkJoinPool.common.parallelism,修改默认共享的ForkJoinPool 的并行数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static void main(String[] args) throws InterruptedException {
        List<Integer> list = IntStream.range(1, 50).boxed().collect(Collectors.toList());
        list.parallelStream().forEach(t->{
            System.out.println(t+":"+ Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        TimeUnit.HOURS.sleep(1);
    }

并行流使用的默认线程池是:ForkJoinPool.commonPool()

代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 ForkJoinPool common的初始化:

其中并行度的值和系统属性:

java.util.concurrent.ForkJoinPool.common.parallelism

有关,如果没配置,则默认和系统cpu核数相关(

Runtime.getRuntime().availableProcessors()获取)。

本机测试,默认并行数为11。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制

System.out.println("ForkJoinPool并行数" + ForkJoinPool.getCommonPoolParallelism());

修改系统属性 ,设置并行数为20:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");
 System.out.println("ForkJoinPool并行数:  " + ForkJoinPool.getCommonPoolParallelism());

执行结果:

注意:虽然可以通过设置系统属性修改默认

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ForkJoinPool common的并行数,提高并行度,但是默认共享使用一个

ForkJoinPool起不到隔离作用,择情况而选择使用。

二、在自定义的ForkJoinPool中运行parallel()操作

通过创建新的ForkJoinPool,设置线程池数目:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ForkJoinPool forkJoinPool = new ForkJoinPool(20);

然后并行流的执行提交给新建的ForkJoinPool执行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
forkJoinPool.submit

示例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.renzhikeji.demo;


import java.util.List;
import java.util.concurrent.*;
import java.util.stream.IntStream;

/**
 * @author 认知科技技术团队
 * 微信公众号:认知科技技术团队
 */
public class JdkDemo {
    public static void main(String[] args) throws InterruptedException {

        List<Integer> list = IntStream.range(1, 50).boxed().toList();
        ForkJoinPool forkJoinPool = new ForkJoinPool(20);
        forkJoinPool.submit(() -> list.parallelStream().forEach(t->{
            System.out.println(t+":"+ Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        TimeUnit.HOURS.sleep(1);
    }

}

执行结果:

执行原理:

判断当前线程是否ForkJoinWorkerThread,如果时,则使用当前线程绑定的ForkJoinPool即我们自定义创建的去执行任务。

三、小结

java的parallelStream并行流,可能需要开发者自定义线程池,起到提高性能及隔离故障的作用。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 认知科技技术团队 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
何时在 Java 中使用并行流-Java快速进阶教程
Java 8引入了Stream API,可以轻松地将集合迭代为数据流。创建并行执行并利用多个处理器内核的流也非常容易。
jack.yang
2025/04/05
930
何时在 Java 中使用并行流-Java快速进阶教程
聊聊如何自定义parallelStream的线程池
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
code4it
2023/08/31
6430
聊聊如何自定义parallelStream的线程池
聊聊如何自定义parallelStream的线程池
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
code4it
2023/07/08
1.3K0
parallelStream的坑,不踩不知道,一踩吓一跳
很多同学喜欢使用lambda表达式,它允许你定义短小精悍的函数,体现你高超的编码水平。当然,这个功能在某些以代码行数来衡量工作量的公司来说,就比较吃亏一些。
xjjdog
2020/09/23
1.1K0
一个线上bug引出的parallelStream() for循环背后面的ForkJoin ...
公司的一个ETL项目,主要是从Blob上的CSV文件和HDFS平台下载数据并解析后入到业务的Mysql,数据量大概一个小时20个文件左右(基本集中到每个小时的50分左右),每个文件8~20万条数据量,分别入到不同的表, 我们在入库的时候是把文件解析后分成1000条一批批量插入(篇幅有限,这里只聊入库的场景)。用的是jdk1.8的Stream.parallel()的方式并发入库。
TodoCoder
2022/09/23
6490
一个线上bug引出的parallelStream() for循环背后面的ForkJoin ...
Java并行流Parallel Stream与Fork-Join线程池的关系,莫要乱用、滥用并行流
在说stream之前,给大家分享一个数组匹配性能优化技巧,其实平时注意一些编码的优化,对整个系统的性能优化是有很大帮助的,积少成多。往往一个项目都是毁在一行行粗心代码上的,比如这里多占点内存,那里多占点内存,慢慢内存就不够用了,结果就想着升级机器配置。
Bug开发工程师
2019/11/27
11.5K1
java线程池,工作窃取算法
前言 在上一篇《java线程池,阿里为什么不允许使用Executors?》中我们谈及了线程池,同时又发现一个现象,当最大线程数还没有满的时候耗时的任务全部堆积给了单个线程, 代码如下: ThreadP
落跑架构师M
2020/02/11
9300
java线程池,工作窃取算法
关于禁止使用Executors创建线程池的分析
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 注意,这里的重点是 不允许。而不是不建议。可见该规范 背后都是血淋淋的生产事故。
冬天里的懒猫
2020/08/04
1.6K0
关于禁止使用Executors创建线程池的分析
Java并行流陷阱:为什么指定线程池可能是个坏主意
Java Stream 并不支持指定线程池,实际编码中,有些开发者可能会使用一些“技巧”来指定线程池。实际上,所谓的技巧不仅降低了可读性,而且很容易出现bug。本文将分析并行流式编程的设计思想、”技巧“会带来的问题,并提出相关的解决方案。
程序猿川子
2024/11/13
1930
Java并行流陷阱:为什么指定线程池可能是个坏主意
Java8--新特性--串并行流与ForkJoin框架
PS:工作窃取带来的性能提升偏理论,API的复杂性较高,实际研发中可控性来说不如其他API。一般使用最多的就是做数据处理。接口和数据库尽量不要使用,线程如何堵塞了就尴尬了。吐槽下,从JDK1.8以后,JDK的源码越来越难度了,变量都是一个字母。
IT架构圈
2020/11/09
6430
Java8--新特性--串并行流与ForkJoin框架
深入浅出parallelStream
今天小强带来java8的Stream,Stream是java8中新增加的一个特性,被java猿统称为流。
程序员小强
2019/06/03
1.3K0
java函数式编程Function(java函数式编程实战)
JAVA版本最新的目前已经发布到11了,但目前市面上大多数公司依然在使用Java7之前版本的语法,然而这些编程模式已经渐渐的跟不上新时代的技术了。比如时下潮流前沿spring framework5中的响应式编程就是使用到了函数式编程的风格。
全栈程序员站长
2022/08/02
2.3K0
java函数式编程Function(java函数式编程实战)
java线程池,工作窃取算法
前言 在上一篇《java线程池,阿里为什么不允许使用Executors?》中我们谈及了线程池,同时又发现一个现象,当最大线程数还没有满的时候耗时的任务全部堆积给了单个线程, 代码如下: ThreadP
落跑架构师M
2019/12/30
8460
java线程池,工作窃取算法
04-CompletableFuture异步线程 性能
小白和他的朋友门,连续输了10几把游戏, 决定去餐厅吃饭了,3个人,直接点了10盘菜,决定化悲愤为食量
彼岸舞
2022/05/10
3530
血的教训,如何正确使用线程池 submit 和 execute 方法
血的教训之背景:使用线程池对存量数据进行迁移,但是总有一批数据迁移失败,无异常日志打印
好好学java
2019/07/10
3.6K0
设置并行流最大线程数
还可以使用ForkJoinPool.getCommonPoolParallelism()直接获取ForkJoinPool中允许设置的最大线程数
阿超
2022/08/21
1.2K0
设置并行流最大线程数
Java8并行流:执行速度快的飞起!
并且还需要关注多个线程之间共享变量的修改问题。而 Java8 为我们提供了并行流,可以一键开启并行模式。是不是很酷呢?让我们来看看。
程序员白楠楠
2020/12/17
1.4K0
Java函数式编程之Stream流编程
如何区分中间操作和终止操作呢?可以根据操作的返回值类型判断,如果返回值是Stream,则该操作为中间操作。如果返回值不是Stream或者为空,则该操作是终止操作。
端碗吹水
2020/09/23
6210
Java函数式编程之Stream流编程
java8 parallelStream性能测试
默认是Runtime.getRuntime().availableProcessors() - 1,这里为7
code4it
2018/09/17
1.3K0
异步任务编排神器CompletableFuture
但是当异步任务繁多并且复杂,任务间可能存在依赖关系时,Future接口变得不太好用
菜菜的后端私房菜
2024/08/15
3840
推荐阅读
相关推荐
何时在 Java 中使用并行流-Java快速进阶教程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验