Fork-Join是什么? Fork-Join是Java的一个并行框架,主要的作用是将大任务分解为多个小任务,交由多个工作线程运行,最后将小任务的计算结果汇总,得到大任务的结果。
Fork-join框架中主要的类以及类间关系 Fork-Join框架中,最主要的类有两个: ForkJoinTask类和ForkJoinPool,前者的实例要由后者的实例提交运行 如下图所示:
一: ForkJoinTask类: 有两个子类:RecursiveTask类和RecursiveAction类
1.ForkJoinTask是能在ForkJoinPool中运行的类的抽象父类,一般不会直接使用它,而是使用继承自它的两个子类:RecursiveTask类和RecursiveAction类 2.外层的ForkJoinTask实例一般会创建多个子ForkJoinTask实例,用于执行多个子任务,而只有最外层的ForkJoinTask被ForkJoinPool提交(submit)后,这些任务才会开始执行 3.ForkJoinTask类似于线程,但是相较之下更加轻量,最外层的ForkJoinTask和它内部的子ForkJoinTask只需要ForkJoinPool中的少量线程就可以顺利执行(这和ForkJoinPool的工作窃取机制有关,下面将讲述) 4.RecursiveTask类可以返回计算结果,RecursiveAction类不能返回计算结果, 它们都要求重写compute方法,如有返回结果则在compute方法中返回,见下面示例:
public class ForkJoinDemo extends RecursiveTask {
@Override
protected Integer compute() {
return '返回结果';
}
}
fork方法:调用fork方法后,该任务将在在ForkJoinTask被提交的ForkJoinPool线程池中异步地执行
join方法: 阻塞直到compute方法内的计算完成,并返回计算的结果
get方法: 等待外层ForkJoinTask内的所有子任务完成,并返回计算的最终结果
二. ForkJoinPool类:一个特殊的线程池,通过调用submit方法提交运行一个ForkJoinTask实例, ForkJoinPool和其他线程池的不同之处在于,ForkJoinPool采用了一种叫做“工作窃取”(work-stealing)的方法去提高线程池运行任务的效率。FookJoinPool和普通的线程池的对比如下
普通的线程池: 一个任务只能由一个线程运行
ForkJoinPool: 一个任务在被一个线程执行的时候,其他空闲的线程可能也会帮忙执行这个任务,即“工作窃取”。提高了线程池运行的效率
ForkJoinTask中添加的任务被分配到ForkJoinPool中的一些双端队列中,每个双端队列由一个工作线程处理,如果这个队列中的任务还没处理完毕,当前的工作线程会从分配给它的工作队列的头部取得任务执行,如果该工作队列中的任务已经处理完毕,则这个空闲下来的线程会从分配给其他线程的工作队列的尾部“窃取”任务来执行。
// 创建一个Fork-join线程池
ForkJoinPool fjPool = new ForkJoinPool();
// 提交运行一个ForkJoinTask实例
fjPool.submit('a ForkJoinTask instance');
ForkJoinTask和ForkJoinPool的使用示例
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask {
private static int minNum = 5;
private int startNum;
private int endNum;
public ForkJoinDemo (int startNum, int endNum) {
this.startNum = startNum;
this.endNum = endNum;
}
@Override
protected Integer compute() {
// 当子任务的工作量划分的足够小的时候,执行计算
if (endNum - startNum<minNum) {
int result = 0;
for (int i=startNum;i<=endNum;i++) {
result+=i;
}
return result;
}
// 将大任务划分成多个子任务
int midNum = (startNum + endNum)/2;
ForkJoinDemo fj1= new ForkJoinDemo(startNum,midNum);
ForkJoinDemo fj2= new ForkJoinDemo(midNum+1,endNum);
fj1.fork();
fj2.fork();
int resultLeft = (Integer) fj1.join();
int resultRight = (Integer) fj2.join();
// 返回本次计算的结果
return resultLeft + resultRight;
}
public static void main (String args []) {
ForkJoinDemo fjdemo = new ForkJoinDemo(1, 100);
// 创建一个Fork-join线程池
ForkJoinPool fjPool = new ForkJoinPool();
// 提交运行一个ForkJoinTask实例
fjPool.submit(fjdemo);
try {
// 阻塞直到取得结果
System.out.println(fjdemo.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}