我正在玩Java 8可完成的未来。我有以下代码:
CountDownLatch waitLatch = new CountDownLatch(1);
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
try {
System.out.println("Wait");
waitLatch.await(); //cancel should interrupt
System.out.println("Done");
} catch (InterruptedException e) {
System.out.println("Interrupted");
throw new RuntimeException(e);
}
});
sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");
assertTrue(future.isCancelled());
assertTrue(future.isDone());
sleep(100); //give it some time to finish使用runAsync,我调度等待锁存的代码的执行。接下来,我取消了未来,希望在里面抛出一个被中断的异常。但是,似乎线程在等待调用时仍然被阻塞,即使未来被取消(断言传递),InterruptedException也不会被抛出。使用ExecutorService的等效代码按预期工作。它是CompletableFuture中的一个bug还是我的示例中的一个bug?
发布于 2014-04-27 21:38:31
很明显这是故意的。CompletableFuture::cancel方法的Javadoc声明:
参数: mayInterruptIfRunning -此值在此实现中没有无效应,因为中断不用于控制处理。
有趣的是,ForkJoinTask::cancel方法对参数mayInterruptIfRunning使用了几乎相同的措辞。
我在这个问题上有一个猜想:
而不是阻塞,CompletableFuture应该创建一个新的CompletionStage,而cpu绑定的任务是叉连接模型的先决条件。因此,对他们中的任何一个使用中断都会使他们的目的落空。另一方面,它可能会增加复杂性,如果按预期使用,这不是必需的。
发布于 2017-06-06 18:28:54
当您调用CompletableFuture#cancel时,您只会停止链的下游部分。上游部分,即最终将称为complete(...)或completeExceptionally(...)的部分,不会得到任何信号表明不再需要结果。
那些‘上游’和‘下游’的东西是什么?
让我们考虑以下代码:
CompletableFuture
.supplyAsync(() -> "hello") //1
.thenApply(s -> s + " world!") //2
.thenAccept(s -> System.out.println(s)); //3在这里,数据从上到下--从供应商创建、通过函数修改到由println使用。上述特定步骤的部分称为上游,下面的部分称为下游。例如,步骤1和步骤2是步骤3的上游。
下面是幕后发生的事情。这不是精确的,相反,它是一个方便的心智模型,正在发生的事情。
ForkJoinPool中)。complete(...)将供应商的结果传递给下一个CompletableFuture下游。CompletableFuture调用下一步--一个函数(步骤2),它接受前一步的结果,并返回将进一步传递给下游CompletableFuture的complete(...)的内容。CompletableFuture调用使用者System.out.println(s)。消费者完成后,下游CompletableFuture将收到它的值,(Void) null正如我们所看到的,这个链中的每个CompletableFuture都必须知道下游谁在等待将值传递给他们的complete(...) (或completeExceptionally(...))。但是CompletableFuture不需要知道任何关于它的上游(或上游-可能有几个)。
因此,在步骤3中调用cancel() 的不会中止步骤1和2,因为从步骤3到步骤2之间没有链接。
假设如果您使用的是CompletableFuture,那么您的步骤就足够小,所以如果执行一些额外的步骤,就不会有什么害处。
如果希望向上游传播取消,则有两个选项:
CompletableFuture (将其命名为cancelled),它将在每一步后进行检查(类似于step.applyToEither(cancelled, Function.identity()))。发布于 2020-12-01 19:09:46
如果您实际上希望能够取消一个任务,那么您必须使用Future本身(例如,由ExecutorService.submit(Callable<T>)返回,而不是CompletableFuture返回)。正如nosid在回答中指出的那样,CompletableFuture完全忽略了对cancel(true)的任何调用。
我怀疑JDK团队没有实现中断,因为:
InputStream.read()的调用正在阻塞调用!( JDK团队也没有计划让标准的I/O系统再次可中断,就像Java早期那样。)Object.finalize()、Object.wait()、Thread.stop()等)逐步淘汰旧的坏API。我相信Thread.interrupt()被认为属于最终必须被废弃和替换的一类。因此,较新的API(如ForkJoinPool和CompletableFuture)已经不支持它了。CompletableFuture是为构建DAG结构的操作管道而设计的,类似于Java .简洁地描述数据流DAG的一个节点的中断如何影响DAG其余部分的执行是非常困难的。(当任何节点被中断时,是否应该立即取消所有并发任务?)一种非常麻烦的方法是让每个CompletableFuture导出对外部可见AtomicReference的引用,然后在需要时从另一个外部线程直接中断Thread引用。或者,如果您使用自己的ExecutorService启动所有任务,在您自己的ThreadPool中,您可以手动中断任何或所有已启动的线程,即使CompletableFuture拒绝通过cancel(true)触发中断。(请注意,尽管CompletableFuture lambda不能抛出选中的异常,所以如果CompletableFuture中有可中断的等待,则必须作为未检查的异常重新抛出。)
更简单地说,您只需在外部作用域中声明一个AtomicReference<Boolean> cancel = new AtomicReference<>(),并定期从每个CompletableFuture任务的lambda内部检查此标志。
您还可以尝试设置Future实例的DAG,而不是CompletableFuture实例的DAG,这样您就可以准确地指定一个任务中的异常和中断/取消如何影响另一个当前正在运行的任务。I show how to do this in my example code in my question here,它工作得很好,但它有很多样板。
https://stackoverflow.com/questions/23320407
复制相似问题