首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >多线程获取结果还在使用Future轮询获取结果吗?CompletionService快来了解下吧。

多线程获取结果还在使用Future轮询获取结果吗?CompletionService快来了解下吧。

原创
作者头像
java金融
修改于 2020-07-09 02:00:20
修改于 2020-07-09 02:00:20
1.5K00
代码可运行
举报
文章被收录于专栏:java金融java金融
运行总次数:0
代码可运行

背景

二胖上次写完参数校验(《二胖写参数校验的坎坷之路》)之后,领导一直不给他安排其他开发任务,就一直让他看看代码熟悉业务。二胖每天上班除了偶尔跟坐在隔壁的前端小姐姐聊聊天,就是看看这写枯燥无味的业务代码,无聊的一匹。虽然二胖已是久经职场的老油条了,但是看到同事们的周报都写的满满的,而自己的周报,就一两行,熟悉了什么功能。心里还是慌得一匹,毕竟公司不养闲人啊。于是乎二胖终于鼓起勇气为了向领导表明自己的上进心,主动向领导要开发任务。领导一看这小伙子这么有上进心,于是就到任务看板里面挑了一个业务逻辑比较简单的任务分配给了二胖。二胖拿到这个任务屁颠屁颠的回到座位。任务比较简单,就是通过爬虫去爬取某些卖机票(某猪、某携、某团等)的网站的一些机票,然后保存到数据库

同步入库

二胖拿到任务,三下五除二就把任务完成了。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 public static void main(String[] args) throws InterruptedException {
        String mouZhuFlightPrice = getMouZhuFlightPrice();
        String mouXieFlightPrice = getMouXieFlightPrice();
        String mouTuanFlightPrice = getMouTuanFlightPrice();
        saveDb(mouZhuFlightPrice);
        saveDb(mouXieFlightPrice);
        saveDb(mouTuanFlightPrice);
    }


    /**
     * 模拟请求某猪网站 爬取机票信息
     *
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouZhuFlightPrice() throws InterruptedException {
        // 模拟请求某猪网站 爬取机票信息
        Thread.sleep(10000);
        return "获取到某猪网站的机票信息了";
    }

    /**
     * 模拟请求某携网站 爬取机票信息
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouXieFlightPrice() throws InterruptedException {
        // 模拟请求某携网站 爬取机票信息
        Thread.sleep(5000);
        return "获取到某携网站的机票信息了";
    }


    /**
     * 模拟请求团网站 爬取机票信息
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouTuanFlightPrice() throws InterruptedException {
        // 模拟请求某团网站 爬取机票信息
        Thread.sleep(3000);
        return "获取到某团网站的机票信息了";
    }

    /**
     * 保存DB
     *
     * @param flightPriceList
     */
    public static void saveDb(String flightPriceList) {
            // 解析字符串 进行异步入库
    }

这次二胖学乖了,任务完成了先去找下坐他对面的技术大拿(看他那发际线就知道了)同事“二狗”让二狗大拿帮忙指点一二,看看代码是否还能有优化的地方。毕竟领导对代码的性能、以及代码的优雅是有要求的。领导多次在部门的周会上提到让我们多看看“二狗”写的代码,学习下人家写代码的优雅、抽象、封装等等。二狗大概的瞄了下二胖写的代码,提出了个小小的建议“这个代码可以采用多线程来优化下哦,你看某猪(CVM)这个网站耗时是拿到结果需要10s,其他的耗时都比它短,先有结果的我们可以先处理的,不需要等到大家都返回了再来处理的”。

轮循futureList获取结果

幸好二胖对多线程了解一点点,于是乎采用future的方式来实现。二胖使用一个List来保存每个任务返回的Future,然后去轮询这些Future,直到每个Future都已完成。由于需要先完成的任务需要先执行,且不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,所以在调用get方式时,需要将超时时间设置为0

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  public static void main(String[] args) {
        int taskSize = 3;
        Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
        Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
        Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());
        List<Future<String>> futureList = new ArrayList<>();
        futureList.add(mouZhuFlightPriceFuture);
        futureList.add(mouXieFlightPriceFuture);
        futureList.add(mouTuanFlightPriceFuture);
        // 轮询,获取完成任务的返回结果
        while (taskSize > 0) {
            for (Future<String> future : futureList) {
                String result = null;
                try {
                    result = future.get(0, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    taskSize--;
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    taskSize--;
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    // 超时异常需要忽略,因为我们设置了等待时间为0,只要任务没有完成,就会报该异常
                }
                // 任务已经完成
                if (result != null) {
                    System.out.println("result=" + result);
                    // 从future列表中删除已经完成的任务
                    futureList.remove(future);
                    taskSize--;
                    // 此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)
                    break; // 进行下一次while循环
                }
            }
        }
    }

上述代码有两个小细节需要注意下:

  • 如采用ArrayList的话futureList删除之后需要break进行下一次while循环,否则会产生我们意想不到的ConcurrentModificationException异常。具体原因可看下《ArrayList的删除姿势你都掌握了吗》这个文章,里面有详细的介绍。
  • 在捕获了InterruptedExceptionExecutionException异常后记得 taskSize--否则就会发生死循环。如果生产发生了死循环你懂的,cpu被你打满,程序假死等。你离被开除也不远了。
  • 上面轮询future列表非常的复杂,而且还有很多异常需要处理,还有很多细节需要考虑,还有被开除的风险。所以这种方案也被pass了。

自定义BlockingQueue实现

  • 上述方案被pass之后,二胖就在思考可以借用哪种数据来实现下先进先出的功能,貌似队列可以实现下这个功能。所以二胖又写了一版采用队列来实现的功能。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  final static ExecutorService executor = new ThreadPoolExecutor(6, 6,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
        Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
        Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());

        // 创建阻塞队列
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
        executor.execute(() -> run(mouZhuFlightPriceFuture, blockingQueue));
        executor.execute(() -> run(mouXieFlightPriceFuture, blockingQueue));
        executor.execute(() -> run(mouTuanFlightPriceFuture, blockingQueue));
        // 异步保存所有机票价格
        for (int i = 0; i < 3; i++) {
            String result = blockingQueue.take();
            System.out.println(result);
            saveDb(result);
        }
    }

    private static void run(Future<String> flightPriceFuture, BlockingQueue<String> blockingQueue) {
        try {
            blockingQueue.put(flightPriceFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
  • 这次比上个版本好多了,代码也简洁多了。不过按理说这种需求应该是大家经常遇到的,应该不需要自己来实现把,JAVA这么贴心的语言应该会有api可以直接拿来用吧。

CompletionService实现

  • 二胖现在毕竟也是对代码的简洁性有追求的人了。于是乎二胖去翻翻自己躺在书柜里吃灰的并发相关的书籍,看看是否有解决方案。

终于皇天不负有心人在二胖快要放弃的时候突然发现了新大陆。 Java并发编程实战》一书6.3.5CompletionService:ExecutorBlockingQueue,有这样一段话:

如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  final static ExecutorService executor = new ThreadPoolExecutor(6, 6,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletionService completionService = new ExecutorCompletionService(executor);
        completionService.submit(() -> getMouZhuFlightPrice());
        completionService.submit(() -> getMouXieFlightPrice());
        completionService.submit(() -> getMouTuanFlightPrice());
        for (int i = 0; i < 3; i++) {
            String result = (String)completionService.take().get();
            System.out.println(result);
            saveDb(result);
        }
    }

当我们使用了CompletionService不用遍历future列表,也不需要去自定义队列了,代码变得简洁了。下面我们就来分析下CompletionService实现的原理吧。

CompletionService 介绍
  • 我们可以先看下JDK源码中CompletionServicejavadoc说明吧
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * A service that decouples the production of new asynchronous tasks
 * from the consumption of the results of completed tasks.  Producers
 * {@code submit} tasks for execution. Consumers {@code take}
 * completed tasks and process their results in the order they
 * complete.

大概意思是CompletionService实现了生产者提交任务和消费者获取结果的解耦,生产者和消费者都不用关心任务的完成顺序,由CompletionService来保证,消费者一定是按照任务完成的先后顺序来获取执行结果。

成员变量

既然需要按照任务的完成顺序获取结果,那内部应该也是通过队列来实现的吧。打开源码我们可以看到,里面有三个成员变量

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ExecutorCompletionService<V> implements CompletionService<V> {
 // 执行task的线程池,创建CompletionService必须指定;
    private final Executor executor;
    //主要用于创建待执行task;
    private final AbstractExecutorService aes;
    //存储已完成状态的task,默认是基于链表结构的阻塞队列LinkedBlockingQueue。     
    private final BlockingQueue<Future<V>> completionQueue;
任务提交

ExecutorCompletionService任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
任务完成后何时进入队列

从源码可以看出,QueueingFutureFutureTask的子类,实现了done方法,在task执行完成之后将当前task添加到completionQueue,将返回结果加入到阻塞队列中,加入的顺序就是任务完成的先后顺序。done方法的具体调用在FutureTaskfinishCompletion方法。

获取已完成任务
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

takepoll都是调用BlockingQueue提供的方法。

  • take() 获取任务阻塞,直到可以拿到任务为止。
  • poll() 获取任务不阻塞,如果没有获取到任务直接返回null
  • poll(long timeout, TimeUnit unit) 带超时时间等待的获取任务方法(一般推荐使用这种

总结

  • CompletionService 把线程池 Executor 和阻塞队列 BlockingQueue融合在一起,能够让批异步任务的管理更简单,将生产者提交任务和消费者获取结果的解耦。
  • CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,我们可以轻松实现后续处理的有序性,避免无谓的等待。

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。

参考 《java并发编程实战》 https://www.jianshu.com/p/19093422dd57 https://blog.csdn.net/cbjcry/article/details/84222853 https://www.jianshu.com/p/493ae1b107e4

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
还不懂 ConcurrentHashMap ?这份源码分析了解一下
上一篇文章介绍了 HashMap 源码,反响不错,也有很多同学发表了自己的观点,这次又来了,这次是 ConcurrentHashMap 了,作为线程安全的HashMap ,它的使用频率也是很高。那么它的存储结构和实现原理是怎么样的呢?
未读代码
2020/04/21
4760
还不懂 ConcurrentHashMap ?这份源码分析了解一下
JDK1.7ConcurrentHashMap源码解析
我们都知道HashMap在多线程情况下,在put的时候,插入的元素超过了容量(由负载因子决定)的范围就会触发扩容操作,就是rehash,这个会重新将原数组的内容重新hash到新的扩容数组中,在多线程的环境下,存在同时其他的元素也在进行put操作,如果hash值相同,可能出现同时在同一数组下用链表表示,造成闭环,导致在get时会出现死循环,所以HashMap是线程不安全的。
黑洞代码
2021/02/09
4410
HashMap, ConcurrentHashMap 原理及源码,一次性讲清楚!
网上关于 HashMap 和 ConcurrentHashMap 的文章确实不少,不过缺斤少两的文章比较多,所以才想自己也写一篇,把细节说清楚说透,尤其像 Java8 中的 ConcurrentHashMap,大部分文章都说不清楚。
Java技术栈
2018/12/29
6470
我是这样给阿里面试官吹 ConcurrentHashMap的
因为上篇文章HashMap已经讲解的很详细了,因此此篇文章会简单介绍思路,再学习并发HashMap就简单很多了,上一篇文章中我们最终知道HashMap是线程不安全的,因此在老版本JDK中提供了HashTable来实现多线程级别的,改变之处重要有以下几点。
浅羽技术
2020/12/07
8870
我是这样给阿里面试官吹 ConcurrentHashMap的
HashMap? ConcurrentHashMap? 相信看完这篇没人能难住你!
Map 这样的 Key Value 在软件开发中是非常经典的结构,常用于在内存中存放数据。
纯洁的微笑
2018/08/16
9790
HashMap? ConcurrentHashMap? 相信看完这篇没人能难住你!
juc系列-ConcurrentHashMap
ConcurrentHashMap和HashMap一样都是基于散列的容器,ConcurrentHashMap可以认为是一种线程安全HashMap,它使用了一中完全不同的加锁策略提高并发性和伸缩性。 ConcurrentHashMap并不是将每个方法在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为“分段锁”。
topgunviper
2022/05/12
4020
juc系列-ConcurrentHashMap
CurrentHashMap原理从7到8
HashMap在并发执行put会引起死循环,是因为多线程会导致HashMap的Entry链表成环,一旦成环,Entry的next节点永远不为空,产生死循环 而
JavaEdge
2018/05/16
4.8K1
ConcurrentHashMap 源码深度解析(java7)——原来如此简单(写的真好,建议收藏)
ConcurrentHashMap算是java基础中非常基本的知识点,不仅在日常开发中经常用到,面试中也是经久不衰的话题。它基本沿用HashMap的接口定义,使得即使不了解其底层原理,也能无缝切换。
100000860378
2021/04/19
6630
ConcurrentHashMap 源码深度解析(java7)——原来如此简单(写的真好,建议收藏)
1.7 的ConcurrentHashMap要得不
在多线程的情况下我们都知道尽可能不要加锁,程序一旦加锁非常影响性能,而ConcurrentHashMap大量的使用了cas操作来避免加锁带来的性能开销,而使用cas就需要用到Unsafe类
余生大大
2022/11/02
6550
1.7 的ConcurrentHashMap要得不
JDK 7 ConcurrentHashMap
JDK1.7中的ConcurrentHashMap间接地实现了Map,并将每一个元素称为分段锁segment,每个segment都是一个HashEntry<K,V>数组,称为table,table的每个元素都是一个HashEntry的单向队列。
一个风轻云淡
2023/10/15
2180
JDK 7 ConcurrentHashMap
JDK 7 ConcurrentHashMap源码解读
HashMap存在并发问题,jdk有提供HashTable,这个HashTable是对HashMap中的所以方法加锁以达到线程安全,但是,这种方式会使得性能下降,看下面的图,假如有两个线程分别要put kk3和kk4,第一个线程最快,它对kk3进行put操作,这时另一个线程要put kk4就要等待,问题是,这两个元素所要put的位置,互不相干,但还是需要等待,这造成了一种资源浪费,所以才会出现ConcurrentHashMap,它分段式加锁,就能很大程度上避免下面的情况。
用针戳左手中指指头
2021/01/29
4000
JDK 7 ConcurrentHashMap源码解读
Java 并发(9)ConcurrentHashMap 源码分析
我们知道哈希表是一种非常高效的数据结构,设计优良的哈希函数可以使其上的增删改查操作达到 O (1) 级别。Java 为我们提供了一个现成的哈希结构,那就是 HashMap 类,在前面的文章中我曾经介绍过 HashMap 类,知道它的所有方法都未进行同步,因此在多线程环境中是不安全的。为此,Java 为我们提供了另外一个 HashTable 类,它对于多线程同步的处理非常简单粗暴,那就是在 HashMap 的基础上对其所有方法都使用 synchronized 关键字进行加锁。
JavaFish
2020/04/15
7140
Java 并发(9)ConcurrentHashMap 源码分析
我们来说一说 ConcurrentHashMap 中十个提升性能的细节
如何在高并发下提高系统吞吐是所有后端开发者追求的目标,Java并发的开创者Doug Lea在Java 7 ConcurrentHashMap的设计中给出了一些参考答案,这里总结了ConcurrentHashMap源码中影响并发性能的十个细节,有常见的自旋锁,CAS的使用,也有延迟写内存,volatile语义退化等不常见的技巧。 由于ConcurrentHashMap的内容比较多,而且Java 7和Java 8两个版本的实现相差比较大。 《阿里巴巴Java开发手册》的作者孤尽对ConcurrentHashMap的设计十分推崇,他说:“ConcurrentHashMap源码是学习Java代码开发规范的一个非常好的学习材料,我建议同学们可以时常去看一看,总会有新的收获的”,相信大家平常也能听到很多对于ConcurrentHashMap设计的溢美之词,在展开隐藏在ConcurrentHashMap所有小秘密之前,在大脑中首先要有这样的一幅图:
程序员小假
2025/06/13
1710
我们来说一说 ConcurrentHashMap 中十个提升性能的细节
ConcurrentHashMap (JDK7) 详解
数据结构 ConcurrentHashMap 实现并发操作的原理 使用了锁分段技术:ConcurrentHashMap持有一组锁(segment[]),并将数据尽可能分散在不同的锁段中(即,每个锁只会
tomas家的小拨浪鼓
2018/06/27
1.8K0
ConcurrentHashMap实现原理
HashMap不是线程安全的,在多线程环境下可以使用Hashtable和ConcurrentHashMap,Hashtable实现线程安全的方式是用synchronized修饰方法,如get和put方法都是用synchronized修饰的,使用的是对象锁,这样会导致线程1get元素(或者put元素)时,线程2不能get元素和put元素,在竞争激烈的时候会出现严重的性能问题
Java识堂
2019/08/13
4950
Java 集合源码分析(一)HashMap
目录 Java 集合源码分析(一)HashMap 1. 概要 2. JDK 7 的 HashMap 3. JDK 1.8 的 HashMap 4. Hashtable 5. JDK 1.7 的 Con
希希里之海
2019/08/29
4990
Java并发指南13:Java 中的 HashMap 和 ConcurrentHashMap 全解析
本文是微信公众号【Java技术江湖】的《Java并发指南》其中一篇,本文大部分内容来源于网络,为了把本文主题讲得清晰透彻,也整合了很多我认为不错的技术博客内容,引用其中了一些比较好的博客文章,如有侵权,请联系作者。
Java技术江湖
2019/11/20
6750
Java并发指南13:Java 中的 HashMap 和 ConcurrentHashMap 全解析
Java并发容器--ConcurrentHashMap
  1、不安全:大家都知道HashMap不是线程安全的,在多线程环境下,对HashMap进行put操作会导致死循环。是因为多线程会导致Entry链表形成环形数据结构,这样Entry的next节点将永远不为空,就会产生死循环获取Entry。具体内容见HashMap随笔。
在周末
2019/09/11
5120
详解Java并发编程利器:ConcurrentHashMap
咦咦咦,各位小可爱,我是你们的好伙伴——bug菌,今天又来给大家普及Java SE相关知识点了,别躲起来啊,听我讲干货还不快点赞,赞多了我就有动力讲得更嗨啦!所以呀,养成先点赞后阅读的好习惯,别被干货淹没了哦~
bug菌
2024/08/05
2530
详解Java并发编程利器:ConcurrentHashMap
Java 集合源码解析 - ConcurrentHashMap(JDK7)
HashMap在并发执行put会引起死循环,是因为多线程会导致HashMap的Entry链表成环,一旦成环,Entry的next节点永远不为空,产生死循环
JavaEdge
2022/11/30
9050
Java 集合源码解析 - ConcurrentHashMap(JDK7)
推荐阅读
相关推荐
还不懂 ConcurrentHashMap ?这份源码分析了解一下
更多 >
交个朋友
加入腾讯云官网粉丝站
蹲全网底价单品 享第一手活动信息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验