Future是在多线程系统中应用最多的一个功能, 在异步获取线程处理结果时, 提供了可取消, 可打断, 可超时, 可等待等诸多处理方式.
以如下代码示例,看下JDK是如何实现这些特性的.
自定义Callable任务, 通过线程池提交任务, 并等待执行结果.
public class TestFutureTask {
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(10000);
return "Done";
}
}
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());
String result = future.get();
executorService.shutdown();
}
}
Future
从Future的方法声明可以看出它的主要功能.
任务可取消, 可设置超时时间等待结果, 判断任务是否取消, 判断任务是否完成等操作.
public interface Future<V> {
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断是否任务已经取消
boolean isCancelled();
// 判断任务是否处理结束
boolean isDone();
// 等待结果
V get() throws InterruptedException, ExecutionException;
// 带超时时间的等待结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Callable, Future和FutureTask
在线程池(AbstractExecutorService类)提交Callable任务时, 会将Callable任务封装在Future接口的实现类FutureTask对象内.
后续线程任务的所有执行,超时等操作都是基于FutureTask处理的.
线程池submit()方法:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Future封装Callable方法
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
FutureTask
FutureTask不仅实现了Future接口,也实现了Runnable接口, 也就是说FutureTask可以和Thread一样按照线程方式启动执行.
FutureTask主要变量
1. state
记录Callable运行状态.
流程中isCancelled(), isDone()等判断状态的方法都是根据state处理的.
状态图如下:
NEW: task初始状态
COMPLETING: 当任务结束前设置结果时的一个中间状态.
NORMAL: 表示任务正常结束.
EXCEPTIONAL: 表示任务因异常而结束
CANCELLED: task还未执行前就调用了cancel(false)方法时
INTERRUPTING: task执行当中,调用cancel(true)中断程序时, 任务处于INTERRUPTING状态, 是一个中间状态.
INTERRUPTED: task调用cancel(true)中断程序时会调用interrupt()方法中断线程运行, state状态由INTERRUPTING转变为INTERRUPTED
2. outcome
记录Callable执行结果, 也有可能是执行产生的异常信息;
它本身并不是volatile变量, 在修改时是根据state状态的CAS操作,保证原子性.
例如, 结果设置方法如下:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
方法中为了提高处理效率采用了直接修改stateOffset地址的方式, 这种方式在和AtomicInteger中value的用法是一样的, 可以参照文章: 并发原子性之Atomic.
3. waiters
阻塞线程列表.
主线程执行get()方法时, 阻塞等待的线程, 可以简单理解为当前执行的主线程.
FutureTask执行流程
在一个异步处理流程中, Callable会被封装成FutureTask, 并最后由线程池分配线程执行, 这里先不考虑线程池是如何分配线程的.
我们只需要知道, 和Thread一样, FutureTask也是被调用start()方法启动线程执行的.
在FutureTask在执行时, 是有两个线程同时执行的:
1.线程池中调用start()方法, 而执行的run()方法, Callable.call()就是被封装在run()方法中的;
2.主线程处理逻辑, 包括get()等待结果, 超时, cancel()取消等操作;
下面分别讲解这两个线程的处理流程
1. run()处理流程
在run()方法执行逻辑中, 会首先回调Callable.call()方法, 并将结果存入outcome, 同时修改对应state状态值.
需要注意的是, 在执行的第一步会先检查(state==NEW), 新任务才会执行处理流程, 这也是FutureTask只能执行一次的原因.
处理流程如下:
注意:流程图中红色部分都是会影响到主线程流程
2. 主线程get()获取结果流程
首先, 根据state状态, 判断主线程是否需要进入线程阻塞流程;
其次, 方法返回的结果会从outcome中获取执行结果;
注意:流程图中红色部分会受到run()流程影响
综上, 在理解的时候, 最好将整个类拆分成两个独立逻辑去理解, 这样受state, outcome, waiters等变量影响较小, 就容易理解的多了.
FutureTask中断
主线程等待超时时, Callable线程并不会中断执行, 是需要调用cancel()方法以及合理设计Callable.call()方法才能正确中断.
下面说下如何才能正确中断Callable线程.
在cancel()方法中, 是调用interrupt()的方式中断线程的.
public boolean cancel(boolean mayInterruptIfRunning) {
Thread t = runner;
t.interrupt();
}
为配合interrupt()方法中断线程, 有两种方式:
1. 需要Callable.call()方法能够抛出InterruptedException才可以;
例如:
public String call() throws Exception {
Thread.sleep(10000);
return "Done";
]}
2. Callable.call()方法能够使用Thread.currentThread().isInterrupted()监控到线程中断状态.
例如:
public String call() throws Exception {
while(!Thread.currentThread().isInterrupted()){
// do somehing
}
return "Done";
}