前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >可取消的异步任务: FutureTask

可取消的异步任务: FutureTask

作者头像
一个架构师
发布2022-06-20 20:04:21
发布2022-06-20 20:04:21
80000
代码可运行
举报
运行总次数:0
代码可运行

Future是在多线程系统中应用最多的一个功能, 在异步获取线程处理结果时, 提供了可取消, 可打断, 可超时, 可等待等诸多处理方式.

以如下代码示例,看下JDK是如何实现这些特性的.

自定义Callable任务, 通过线程池提交任务, 并等待执行结果.

代码语言:javascript
代码运行次数:0
运行
复制
public class TestFutureTask {
    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            Thread.sleep(10000);
            return "Done";
        }
    }
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        String result = future.get();
        executorService.shutdown();
    }
}

Future

从Future的方法声明可以看出它的主要功能.

任务可取消, 可设置超时时间等待结果, 判断任务是否取消, 判断任务是否完成等操作.

代码语言:javascript
代码运行次数:0
运行
复制
public interface Future<V> {
// 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    // 判断是否任务已经取消
    boolean isCancelled();
    // 判断任务是否处理结束
    boolean isDone();
// 等待结果
    V get() throws InterruptedException, ExecutionException;
// 带超时时间的等待结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Callable, Future和FutureTask

在线程池(AbstractExecutorService类)提交Callable任务时, 会将Callable任务封装在Future接口的实现类FutureTask对象内.

后续线程任务的所有执行,超时等操作都是基于FutureTask处理的.

线程池submit()方法:

代码语言:javascript
代码运行次数:0
运行
复制
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

Future封装Callable方法

代码语言:javascript
代码运行次数:0
运行
复制
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

FutureTask

FutureTask不仅实现了Future接口,也实现了Runnable接口, 也就是说FutureTask可以和Thread一样按照线程方式启动执行.

FutureTask主要变量

1. state

记录Callable运行状态.

流程中isCancelled(), isDone()等判断状态的方法都是根据state处理的.

状态图如下:

NEW: task初始状态

COMPLETING: 当任务结束前设置结果时的一个中间状态.

NORMAL: 表示任务正常结束.

EXCEPTIONAL: 表示任务因异常而结束

CANCELLED: task还未执行前就调用了cancel(false)方法时

INTERRUPTING: task执行当中,调用cancel(true)中断程序时, 任务处于INTERRUPTING状态, 是一个中间状态.

INTERRUPTED: task调用cancel(true)中断程序时会调用interrupt()方法中断线程运行, state状态由INTERRUPTING转变为INTERRUPTED

2. outcome

记录Callable执行结果, 也有可能是执行产生的异常信息;

它本身并不是volatile变量, 在修改时是根据state状态的CAS操作,保证原子性.

例如, 结果设置方法如下:

代码语言:javascript
代码运行次数:0
运行
复制
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

方法中为了提高处理效率采用了直接修改stateOffset地址的方式, 这种方式在和AtomicInteger中value的用法是一样的, 可以参照文章: 并发原子性之Atomic.

3. waiters

阻塞线程列表.

主线程执行get()方法时, 阻塞等待的线程, 可以简单理解为当前执行的主线程.

FutureTask执行流程

在一个异步处理流程中, Callable会被封装成FutureTask, 并最后由线程池分配线程执行, 这里先不考虑线程池是如何分配线程的.

我们只需要知道, 和Thread一样, FutureTask也是被调用start()方法启动线程执行的.

在FutureTask在执行时, 是有两个线程同时执行的:

1.线程池中调用start()方法, 而执行的run()方法, Callable.call()就是被封装在run()方法中的;

2.主线程处理逻辑, 包括get()等待结果, 超时, cancel()取消等操作;

下面分别讲解这两个线程的处理流程

1. run()处理流程

在run()方法执行逻辑中, 会首先回调Callable.call()方法, 并将结果存入outcome, 同时修改对应state状态值.

需要注意的是, 在执行的第一步会先检查(state==NEW), 新任务才会执行处理流程, 这也是FutureTask只能执行一次的原因.

处理流程如下:

注意:流程图中红色部分都是会影响到主线程流程

2. 主线程get()获取结果流程

首先, 根据state状态, 判断主线程是否需要进入线程阻塞流程;

其次, 方法返回的结果会从outcome中获取执行结果;

注意:流程图中红色部分会受到run()流程影响

综上, 在理解的时候, 最好将整个类拆分成两个独立逻辑去理解, 这样受state, outcome, waiters等变量影响较小, 就容易理解的多了.

FutureTask中断

主线程等待超时时, Callable线程并不会中断执行, 是需要调用cancel()方法以及合理设计Callable.call()方法才能正确中断.

下面说下如何才能正确中断Callable线程.

在cancel()方法中, 是调用interrupt()的方式中断线程的.

代码语言:javascript
代码运行次数:0
运行
复制
public boolean cancel(boolean mayInterruptIfRunning) {
                Thread t = runner;
                    t.interrupt();
}

为配合interrupt()方法中断线程, 有两种方式:

1. 需要Callable.call()方法能够抛出InterruptedException才可以;

例如:

代码语言:javascript
代码运行次数:0
运行
复制
public String call() throws Exception {
    Thread.sleep(10000);
    return "Done";
]}

2. Callable.call()方法能够使用Thread.currentThread().isInterrupted()监控到线程中断状态.

例如:

代码语言:javascript
代码运行次数:0
运行
复制
public String call() throws Exception {
   while(!Thread.currentThread().isInterrupted()){
      // do somehing
   }
   return "Done";
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-07-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 从码农的全世界路过 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档