ForkJoin 下 JDK 1.7 并行执行任务的,数量越大,效率越高
比如 :大数据 Map Reduce(把大任务拆分成小任务)
ForkJoin 特点: 工作窃取
举例子:
PS: 维护的是双端队列 Deuue
A线程执行任务到 第二个
B线程执行完毕,那么B线程回去讲A线程的东西拿来执行,从而提高效率
认识forkjion
ForkJoin 使用两个类来完成以上两件事情:
task 类 里面编写的是我们继承了 递归任务继承的实现方法
public class forkjoinDemo extends RecursiveTask<Long> {
/* 解决方案 也是有三六九等的,比如案例 求和
* 最低等 就是直接for循环求和
* 中等 使用forkjion
* 高等 stream 并行流
* */
//开始
private long start;
//结束
private long end;
//到多少值,才开始分开任务
private long threshold = 10000L;
public forkjoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
//判断超过阈值的时候 开始使用 fork join
if (end - start > threshold) {
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 求出中间值
long mid = (start - end) / 2;
forkjoinDemo task1 = new forkjoinDemo(start, mid);
//拆分任务,把任务压入线程队列
task1.fork();
forkjoinDemo task2 = new forkjoinDemo(mid + 1, end);
task2.fork();
return task1.join() + task2.join();
}
}
}
测试类 三种方法的速度
public class test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//test1(); 7042;
//test2(); 969
//test3(); 179;
}
public static void test1() {
Long sum = 0L;
long start = System.currentTimeMillis();
for (long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum" + sum + "=> 执行时间" + (end - start));
}
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new forkjoinDemo(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum" + sum + "=> 执行时间" + (end - start));
}
public static void test3() {
long start = System.currentTimeMillis();
//并行流
long reduce = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum" + reduce + "=> 执行时间" + (end - start));
}
}
常见的两种创建线程的方式。一种是直接继承Thread,另外一种就是实现Runnable接口。
这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。
从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。(因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果)
上图简单描述了不使用Future和使用Future的区别,不使用Future模式,主线程在invoke完一些耗时逻辑之后需要等待,这个耗时逻辑在实际应用中可能是一次RPC调用,可能是一个本地IO操作等。B图表达的是使用Future模式之后,我们主线程在invoke之后可以立即返回,去做其他的事情,回头再来看看刚才提交的invoke有没有结果。
当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值,注意我加粗的地方,Future的get() 方法会阻塞主线程。即使我们使用isDone()方法轮询去查看线程执行状态,但是这样也非常浪费cpu资源。
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。
通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。
有两种格式,一种是supply开头的方法,一种是run开头的方法
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
同步获取结果
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
CompletableFuture<Integer> future = new CompletableFuture<>();
Integer integer = future.get();
get() 方法同样会阻塞直到任务完成,上面的代码,主线程会一直阻塞,因为这种方式创建的future从未完成。有兴趣的小伙伴可以打个断点看看,状态会一直是not completed
代码使用案例
public static void main(String[] args) throws ExecutionException, InterruptedException {
没有返回值的异步回调, runAsync
//CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {
// System.out.println(Thread.currentThread().getName() + "runAsync=> Void");
//});
//System.out.println("1111");
获取执行结果
//completableFuture.get();
// 有返回值的
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "runAsync=>integer");
int i = 10 / 0;
return 1024;
});
completableFuture.whenComplete((t, u) -> {
//t是正常的返回结果
//u是返回报错信息
System.out.println("t=>" + t);
System.out.println("u=>" + u);
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 233;
}).get();
}