首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >多线程获取结果还在使用Future轮询获取结果吗?CompletionService快来了解下吧。

多线程获取结果还在使用Future轮询获取结果吗?CompletionService快来了解下吧。

原创
作者头像
java金融
修改于 2020-07-09 02:00:20
修改于 2020-07-09 02:00:20
1.5K00
代码可运行
举报
文章被收录于专栏:java金融java金融
运行总次数:0
代码可运行

背景

二胖上次写完参数校验(《二胖写参数校验的坎坷之路》)之后,领导一直不给他安排其他开发任务,就一直让他看看代码熟悉业务。二胖每天上班除了偶尔跟坐在隔壁的前端小姐姐聊聊天,就是看看这写枯燥无味的业务代码,无聊的一匹。虽然二胖已是久经职场的老油条了,但是看到同事们的周报都写的满满的,而自己的周报,就一两行,熟悉了什么功能。心里还是慌得一匹,毕竟公司不养闲人啊。于是乎二胖终于鼓起勇气为了向领导表明自己的上进心,主动向领导要开发任务。领导一看这小伙子这么有上进心,于是就到任务看板里面挑了一个业务逻辑比较简单的任务分配给了二胖。二胖拿到这个任务屁颠屁颠的回到座位。任务比较简单,就是通过爬虫去爬取某些卖机票(某猪、某携、某团等)的网站的一些机票,然后保存到数据库。

同步入库

二胖拿到任务,三下五除二就把任务完成了。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 public static void main(String[] args) throws InterruptedException {
        String mouZhuFlightPrice = getMouZhuFlightPrice();
        String mouXieFlightPrice = getMouXieFlightPrice();
        String mouTuanFlightPrice = getMouTuanFlightPrice();
        saveDb(mouZhuFlightPrice);
        saveDb(mouXieFlightPrice);
        saveDb(mouTuanFlightPrice);
    }


    /**
     * 模拟请求某猪网站 爬取机票信息
     *
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouZhuFlightPrice() throws InterruptedException {
        // 模拟请求某猪网站 爬取机票信息
        Thread.sleep(10000);
        return "获取到某猪网站的机票信息了";
    }

    /**
     * 模拟请求某携网站 爬取机票信息
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouXieFlightPrice() throws InterruptedException {
        // 模拟请求某携网站 爬取机票信息
        Thread.sleep(5000);
        return "获取到某携网站的机票信息了";
    }


    /**
     * 模拟请求团网站 爬取机票信息
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouTuanFlightPrice() throws InterruptedException {
        // 模拟请求某团网站 爬取机票信息
        Thread.sleep(3000);
        return "获取到某团网站的机票信息了";
    }

    /**
     * 保存DB
     *
     * @param flightPriceList
     */
    public static void saveDb(String flightPriceList) {
            // 解析字符串 进行异步入库
    }

这次二胖学乖了,任务完成了先去找下坐他对面的技术大拿(看他那发际线就知道了)同事“二狗”让二狗大拿帮忙指点一二,看看代码是否还能有优化的地方。毕竟领导对代码的性能、以及代码的优雅是有要求的。领导多次在部门的周会上提到让我们多看看“二狗”写的代码,学习下人家写代码的优雅、抽象、封装等等。二狗大概的瞄了下二胖写的代码,提出了个小小的建议“这个代码可以采用多线程来优化下哦,你看某猪(CVM)这个网站耗时是拿到结果需要10s,其他的耗时都比它短,先有结果的我们可以先处理的,不需要等到大家都返回了再来处理的”。

轮循futureList获取结果

幸好二胖对多线程了解一点点,于是乎采用future的方式来实现。二胖使用一个List来保存每个任务返回的Future,然后去轮询这些Future,直到每个Future都已完成。由于需要先完成的任务需要先执行,且不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,所以在调用get方式时,需要将超时时间设置为0

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  public static void main(String[] args) {
        int taskSize = 3;
        Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
        Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
        Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());
        List<Future<String>> futureList = new ArrayList<>();
        futureList.add(mouZhuFlightPriceFuture);
        futureList.add(mouXieFlightPriceFuture);
        futureList.add(mouTuanFlightPriceFuture);
        // 轮询,获取完成任务的返回结果
        while (taskSize > 0) {
            for (Future<String> future : futureList) {
                String result = null;
                try {
                    result = future.get(0, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    taskSize--;
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    taskSize--;
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    // 超时异常需要忽略,因为我们设置了等待时间为0,只要任务没有完成,就会报该异常
                }
                // 任务已经完成
                if (result != null) {
                    System.out.println("result=" + result);
                    // 从future列表中删除已经完成的任务
                    futureList.remove(future);
                    taskSize--;
                    // 此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)
                    break; // 进行下一次while循环
                }
            }
        }
    }

上述代码有两个小细节需要注意下:

  • 如采用ArrayList的话futureList删除之后需要break进行下一次while循环,否则会产生我们意想不到的ConcurrentModificationException异常。具体原因可看下《ArrayList的删除姿势你都掌握了吗》这个文章,里面有详细的介绍。
  • 在捕获了InterruptedExceptionExecutionException异常后记得 taskSize--否则就会发生死循环。如果生产发生了死循环你懂的,cpu被你打满,程序假死等。你离被开除也不远了。
  • 上面轮询future列表非常的复杂,而且还有很多异常需要处理,还有很多细节需要考虑,还有被开除的风险。所以这种方案也被pass了。

自定义BlockingQueue实现

  • 上述方案被pass之后,二胖就在思考可以借用哪种数据来实现下先进先出的功能,貌似队列可以实现下这个功能。所以二胖又写了一版采用队列来实现的功能。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  final static ExecutorService executor = new ThreadPoolExecutor(6, 6,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
        Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
        Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());

        // 创建阻塞队列
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
        executor.execute(() -> run(mouZhuFlightPriceFuture, blockingQueue));
        executor.execute(() -> run(mouXieFlightPriceFuture, blockingQueue));
        executor.execute(() -> run(mouTuanFlightPriceFuture, blockingQueue));
        // 异步保存所有机票价格
        for (int i = 0; i < 3; i++) {
            String result = blockingQueue.take();
            System.out.println(result);
            saveDb(result);
        }
    }

    private static void run(Future<String> flightPriceFuture, BlockingQueue<String> blockingQueue) {
        try {
            blockingQueue.put(flightPriceFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
  • 这次比上个版本好多了,代码也简洁多了。不过按理说这种需求应该是大家经常遇到的,应该不需要自己来实现把,JAVA这么贴心的语言应该会有api可以直接拿来用吧。

CompletionService实现

  • 二胖现在毕竟也是对代码的简洁性有追求的人了。于是乎二胖去翻翻自己躺在书柜里吃灰的并发相关的书籍,看看是否有解决方案。

终于皇天不负有心人在二胖快要放弃的时候突然发现了新大陆。 《Java并发编程实战》一书6.3.5CompletionService:ExecutorBlockingQueue,有这样一段话:

如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  final static ExecutorService executor = new ThreadPoolExecutor(6, 6,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletionService completionService = new ExecutorCompletionService(executor);
        completionService.submit(() -> getMouZhuFlightPrice());
        completionService.submit(() -> getMouXieFlightPrice());
        completionService.submit(() -> getMouTuanFlightPrice());
        for (int i = 0; i < 3; i++) {
            String result = (String)completionService.take().get();
            System.out.println(result);
            saveDb(result);
        }
    }

当我们使用了CompletionService不用遍历future列表,也不需要去自定义队列了,代码变得简洁了。下面我们就来分析下CompletionService实现的原理吧。

CompletionService 介绍
  • 我们可以先看下JDK源码中CompletionServicejavadoc说明吧
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * A service that decouples the production of new asynchronous tasks
 * from the consumption of the results of completed tasks.  Producers
 * {@code submit} tasks for execution. Consumers {@code take}
 * completed tasks and process their results in the order they
 * complete.

大概意思是CompletionService实现了生产者提交任务和消费者获取结果的解耦,生产者和消费者都不用关心任务的完成顺序,由CompletionService来保证,消费者一定是按照任务完成的先后顺序来获取执行结果。

成员变量

既然需要按照任务的完成顺序获取结果,那内部应该也是通过队列来实现的吧。打开源码我们可以看到,里面有三个成员变量

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ExecutorCompletionService<V> implements CompletionService<V> {
 // 执行task的线程池,创建CompletionService必须指定;
    private final Executor executor;
    //主要用于创建待执行task;
    private final AbstractExecutorService aes;
    //存储已完成状态的task,默认是基于链表结构的阻塞队列LinkedBlockingQueue。     
    private final BlockingQueue<Future<V>> completionQueue;
任务提交

ExecutorCompletionService任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
任务完成后何时进入队列

从源码可以看出,QueueingFutureFutureTask的子类,实现了done方法,在task执行完成之后将当前task添加到completionQueue,将返回结果加入到阻塞队列中,加入的顺序就是任务完成的先后顺序。done方法的具体调用在FutureTaskfinishCompletion方法。

获取已完成任务
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

takepoll都是调用BlockingQueue提供的方法。

  • take() 获取任务阻塞,直到可以拿到任务为止。
  • poll() 获取任务不阻塞,如果没有获取到任务直接返回null
  • poll(long timeout, TimeUnit unit) 带超时时间等待的获取任务方法(一般推荐使用这种

总结

  • CompletionService 把线程池 Executor 和阻塞队列 BlockingQueue融合在一起,能够让批异步任务的管理更简单,将生产者提交任务和消费者获取结果的解耦。
  • CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,我们可以轻松实现后续处理的有序性,避免无谓的等待。

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。

参考 《java并发编程实战》 https://www.jianshu.com/p/19093422dd57 https://blog.csdn.net/cbjcry/article/details/84222853 https://www.jianshu.com/p/493ae1b107e4

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
史上最强 Java Solon v3.2.0 发布 并发性能提升 700% 内存节省 50%
我将先介绍Java Solon v3.2.0的核心特性,再给出技术方案示例,最后列举实际应用案例,让你能全面了解其使用方法和优势。
啦啦啦191
2025/06/17
1470
史上最强 Java Solon v3.2.0 发布 并发性能提升 700% 内存节省 50%
校招 Java 面试高频知识点深度解析与实战案例全攻略
在AI驱动的数字化转型浪潮中,Java开发岗位的面试要求持续升级。本文结合最新技术趋势,深度解析校招Java面试的核心知识点,并通过真实业务场景的实战案例,帮助同学们掌握从Java基础到云原生技术栈的完整体系。
啦啦啦191
2025/07/02
1280
校招 Java 面试高频知识点深度解析与实战案例全攻略
Java 项目实操高并发电商系统核心模块实现教程之关键技术与长尾案例解析
为帮助你在面试中展现技术深度和解决问题的能力,以下通过一个高并发电商系统的商品模块实战案例,结合最新技术栈,展示从设计到落地的完整过程。
啦啦啦191
2025/07/13
1120
Java 项目实操高并发电商系统核心模块实现教程之关键技术与长尾案例解析
如何设计一个秒杀系统,(高并发高可用分布式集群)
设计一个高并发、高可用的分布式秒杀系统是一个非常具有挑战性的任务,需要从架构、数据库、缓存、并发控制、降级限流等多个维度进行考虑。以下是一个典型的秒杀系统设计思路:
小马哥学JAVA
2024/07/04
3790
Spring Boot 3.x 现代化企业级应用开发实战与最佳实践
我将基于最新的Spring Boot 3.x版本,结合微服务、云原生、响应式编程等前沿技术,为您提供一个现代化的Spring Boot项目实操指南。
啦啦啦191
2025/06/18
2090
Spring Boot 3.x 现代化企业级应用开发实战与最佳实践
缓存穿透、击穿、雪崩的成因及解决方案
缓存击穿的成因 缓存击穿是指在高并发场景下,某个热点数据的缓存突然失效(如缓存过期),而这时恰好有大量的并发请求来访问这个刚刚失效的key,所有请求都无法从缓存中获取到数据,进而都涌向数据库,导致数据库瞬时压力过大,这就是所谓的“击穿”。尤其是在数据更新并不频繁的情况下,这种集中性的数据库查询压力可能导致数据库响应变慢,甚至宕机。 解决方案 - Java代码示例(使用Redis分布式锁) 下面是一个基于Redis实现分布式锁,用于解决缓存击穿问题的基本Java代码框架: import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import java.util.Collections; @Service public class CacheService { private final StringRedisTemplate redisTemplate; private final RedisScript<Long> luaLockScript; public CacheService(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; luaLockScript = new DefaultRedisScript<>(// 定义Lua脚本,用于获取分布式锁 "if redis.call('exists', KEYS[1]) == 0 then " + "redis.call('hset', KEYS[1], ARGV[1], 1);" + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 1; " + "end;" + "return 0;", Long.class); } public Object getDataFromDBWithLock(String cacheKey) { Boolean locked = acquireLock(cacheKey, "uniqueId"); // 尝试获取锁 if (locked) { try { // 如果获取到锁,则尝试从缓存中获取数据 Object data = getDataFromCache(cacheKey); if (data != null) { return data; } // 缓存未命中,从数据库加载数据 data = loadFromDatabase(cacheKey); // 将数据写入缓存 writeToCache(cacheKey, data); return data; } finally { releaseLock(cacheKey, "uniqueId"); // 无论何时,都要确保最后释放锁 } } else { // 没有获取到锁,等待其他线程完成数据库操作后从缓存中读取 return getDataFromCacheAfterWait(cacheKey); } } private Boolean acquireLock(String key, String uniqueId) { // 调用Lua脚本获取分布式锁,这里假设expireTime是你设置的锁超时时间 Long result = redisTemplat
用户7353950
2024/05/10
2040
缓存穿透、击穿、雪崩的成因及解决方案
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
电商秒杀场景具有瞬时高并发、资源竞争激烈和数据一致性要求高三大特征。当数万用户同时抢购少量商品时(如1000件商品承受10万QPS),系统面临多重挑战:
大熊计算机
2025/07/15
1310
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
面试题解析:如何解决分布式秒杀系统中的库存超卖问题?
在构建分布式秒杀系统时,一个常见的挑战是如何防止库存超卖问题。当多个用户同时抢购同一商品时,如果不加以控制,可能导致库存出现负数,影响系统的稳定性和用户体验。本文将讨论这个问题,并提供一种综合的解决方案。
GeekLiHua
2025/01/21
4870
Spring Boot 3 微服务架构实战 云原生电商平台全流程构建
通过上述技术组合,我们可以构建一个现代化、高可用的云原生电商平台。未来发展方向包括:
啦啦啦191
2025/06/20
1580
Spring Boot 3 微服务架构实战 云原生电商平台全流程构建
SpringBoot实现并发、超发和锁机制/抢购示例:超发、乐观锁、悲观锁和Redis的使用
上述的超发现象,归根到底在于数据库时被多个线程同时访问的,在没有加锁的情况下,上述代码并不是线程安全的。
用户10175992
2022/11/15
1.1K0
SpringBoot实现并发、超发和锁机制/抢购示例:超发、乐观锁、悲观锁和Redis的使用
基于代码实操SpringBoot、Redis、LUA秒杀系统!
本文主要目的还是用代码实现一下防止商品超卖的功能,所以像制定秒杀计划,展示商品等功能就不着重写了。
Java程序猿
2021/02/20
1K0
最新 Java 从入门到实战技术实操指南
以下是结合最新技术的Java实操内容,涵盖从基础到微服务架构的完整学习路径和应用实例:
啦啦啦191
2025/06/20
910
最新 Java 从入门到实战技术实操指南
捣鼓一个电商功能设计
谷歌系统设计面试有一道题是关于如何设计秒杀架构,国外一位老哥给出了5种方法,下图是其中一种。
JavaSouth南哥
2024/10/16
2050
捣鼓一个电商功能设计
飞算JavaAI:开启 Java 开发 “人机协作” 新纪元
在Java开发的日常工作中,开发者常常陷入两难:追求效率可能牺牲代码质量,注重质量又难免拖慢进度。重复的编码工作消耗大量精力,复杂业务逻辑稍不留意就会埋下漏洞,老系统重构更是如同在钢丝上行走——牵一发而动全身。飞算JavaAI的出现,并非要取代开发者,而是构建了一种全新的协作模式:让AI处理机械性工作,让人专注于核心创意与决策。本文将从开发全流程的角度,结合实际案例与技术细节,阐述飞算JavaAI如何成为开发者的“智能搭档”,而非简单的“代码生成工具”。
Pocker_Spades_A
2025/07/14
1310
飞算JavaAI:开启 Java 开发 “人机协作” 新纪元
2025 年华为 Java 面试宝典:最新面试题及答案解析汇总
Java面试涵盖多个方面的高频考点,包括Java基础、并发编程、JVM、框架、分布式等,以下是详细介绍:
啦啦啦191
2025/07/05
1330
2025 年华为 Java 面试宝典:最新面试题及答案解析汇总
Java高并发秒杀API(四)之高并发优化
并发性上不去是因为当多个线程同时访问一行数据时,产生了事务,因此产生写锁,每当一个获取了事务的线程把锁释放,另一个排队线程才能拿到写锁,QPS(Query Per Second每秒查询率)和事务执行的时间有密切关系,事务执行时间越短,并发性越高,这也是要将费时的I/O操作移出事务的原因。
雨临Lewis
2022/01/11
1.5K0
Java高并发秒杀API(四)之高并发优化
【Redis】SpringBoot集成Redis分布式锁以及Redis缓存
第二个spring-boot-start-cache的依赖,是使用缓存注解需要的,我在项目中没有引入。 因为我在websocket中已经引入了。 查询依赖关系 ctrl+shift+alt+u 快捷键(也可以在pom.xml文件上右键->Maven->Show Dependencies…)查询maven包依赖引入关系,ctrl+f搜索包
谙忆
2021/01/21
9820
2025 年 Java 面试宝典社招春招秋招实操全攻略
以下是一份基于相关技术平台文章整合的2025年Java面试宝典内容,包含技术方案和应用实例,帮助你应对社招、春招和秋招:
啦啦啦191
2025/06/26
880
2025 年 Java 面试宝典社招春招秋招实操全攻略
处理Redis与MySQL数据不一致的Java定期巡检方案
假设我们有一个电商秒杀系统,商品库存信息存储在MySQL数据库中,同时使用Redis缓存了库存信息。由于高并发的秒杀场景,可能导致Redis和MySQL中的库存数据不一致。
GeekLiHua
2025/01/21
830
Redis高并发分布式锁详解
  1.为了解决Java共享内存模型带来的线程安全问题,我们可以通过加锁来保证资源访问的单一,如JVM内置锁synchronized,类级别的锁ReentrantLock。
忧愁的chafry
2022/10/30
1.2K0
Redis高并发分布式锁详解
推荐阅读
相关推荐
史上最强 Java Solon v3.2.0 发布 并发性能提升 700% 内存节省 50%
更多 >
交个朋友
加入[腾讯云] DeepSeek开发者交流群
前沿技术深度讨论 发展开发者人脉圈
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档