Fork/Join框架是JDK7提供的一个用于并行执行任务的框架,其基本思路就是将一个较大的任务,按照一定规则,划分成若干个逻辑相同的子任务并发执行。待子任务执行后,再汇总每个任务的结果,最后返回最终的结果。例如,我们要对10000个元素求和,可以拆分成10个子任务,每个子任务计算1000个元素的和,最后将所有子任务的结果加起来,就是最终结果。
其流程如下:
Fork/Join是一种典型的分治算法,类似归并排序、快速排序等,先将待解决的问题拆分成若干个逻辑相同的子问题,待所有子问题求解完毕,原问题就自然处理完成。
Fork/Join框架主要涉及一下几个核心类:
下面使用一个示例,演示Fork/Join框架的用法。
该示例要实现的功能是,给定一个路径,打印出该路径下满足一定条件的文件(不包含目录)。
package william.concurrent.forkjoin;
import org.springframework.util.CollectionUtils;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
/**
* @Auther: ZhangShenao
* @Date: 2018/11/18 12:21
* @Description:
*/
public class ListFilesTask extends RecursiveAction{
private String path;
private Predicate<File> filePredicate;
public ListFilesTask(String path, Predicate<File> filePredicate) {
this.path = path;
this.filePredicate = filePredicate;
}
@Override
protected void compute() {
File file = new File(path);
if (file == null){
return;
}
File[] files = file.listFiles();
if (files == null || files.length <= 0){
return;
}
List<ListFilesTask> tasks = new ArrayList<>();
for (File f : files){
if (f.isFile()){
if (filePredicate.test(f)){
System.err.println(f.getName());
}
}
else if (f.isDirectory()){
tasks.add(new ListFilesTask(f.getPath(),filePredicate));
}
}
if (!CollectionUtils.isEmpty(tasks)){
invokeAll(tasks);
}
}
public static void main(String[] args) throws InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ListFilesTask listFilesTask = new ListFilesTask("/", (file -> file.getName().endsWith(".java")));
forkJoinPool.submit(listFilesTask);
forkJoinPool.awaitTermination(5, TimeUnit.SECONDS);
forkJoinPool.shutdown();
}
}
该示例以纯异步的方式执行,由于给定路径下的文件可能较多,因此对于每一个目录,都fork一个子线程去处理该目录下的文件。
工作窃取算法是指某个线程从其他线程的任务队列中,窃取任务来执行。Fork/Join框架就采用了该算法。之所以这样处理,是因为当一个任务被fork了多个子线程执行后,子线程的处理速度可能不一致,处理快的线程与其等待,不如帮其它处理慢的线程去执行一些任务。
为了减少访问同一个任务队列所引发的竞争,任务队列一般都采用双端队列,工作线程从队头取任务执行,而窃取的线程从队尾窃取任务。
工作窃取的执行流程如下: