1. 对象的创建及完成。
static void complete() throws ExecutionException, InterruptedException {
CompletableFuture<String> f = new CompletableFuture<>();
new Thread() {
@Override
public void run() {
// 该方法会将结果传给CompletableFuture,并将其设置为完成状态
// 一般是异步调用
f.complete("hello");
}
}.start();
System.out.println(f.get()); // 输出 hello
}
2. 异步等待CompletableFuture的完成,并回调方法。
static void whenComplete() {
CompletableFuture<String> f = new CompletableFuture<>();
f.whenComplete(
new BiConsumer<>() {
@Override
public void accept(String s, Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace();
} else {
System.out.println(s);
}
}
});
// f.complete("hello"); // 正常完成
f.completeExceptionally(new RuntimeException()); // 异常完成
}
3. 设置完成的超时时间。
static void timeout() throws InterruptedException {
CompletableFuture<String> f = new CompletableFuture<>();
f.whenComplete(
new BiConsumer<>() {
@Override
public void accept(String s, Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace();
} else {
System.out.println(s);
}
}
});
// f.orTimeout(1, TimeUnit.SECONDS); // 1秒内未完成就抛timeout异常给CompletableFuture
f.completeOnTimeout(
"timeout", 1, TimeUnit.SECONDS); // 一秒内未完成会把timeout字符串作为结果传给CompletableFuture
Thread.sleep(2000); // 等待timeout的发生
}
4. 异步执行某任务,当任务完成时,将结果传给CompletableFuture。
static void supplyAsync() {
ExecutorService exec = Executors.newSingleThreadExecutor();
CompletableFuture<String> f =
CompletableFuture.supplyAsync(
new Supplier<>() {
@Override
public String get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}
},
exec);
f.whenComplete(
new BiConsumer<>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println(s); // 输出hello
}
});
exec.shutdown();
}
5. 等待其他的所有CompletableFuture完成。
static void allOf() {
CompletableFuture<Integer> f1 = new CompletableFuture<>();
CompletableFuture<Integer> f2 = new CompletableFuture<>();
CompletableFuture<Void> f = CompletableFuture.allOf(f1, f2);
f.whenComplete(
new BiConsumer<>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
System.out.println(f1.getNow(0) + f2.getNow(0)); // 输出3
}
});
f1.complete(1);
f2.complete(2);
}
6. 将异常结果转成正常结果。
static void exceptionally() {
CompletableFuture<String> f1 = new CompletableFuture<>();
// 如果f1是正常结果,则原结果会传给f2
// 如果f1是异常结果,就会调用下面的方法转成正常结果,然后再传给f2
CompletableFuture<String> f2 =
f1.exceptionally(
new Function<>() {
@Override
public String apply(Throwable throwable) {
return "exception";
}
});
// thenAccept方法传入的函数只有在f2是正常结果时才会被调用
f2.thenAccept(
new Consumer<>() {
@Override
public void accept(String s) {
System.out.println(s); // 输出exception
}
});
f1.completeExceptionally(new RuntimeException());
}
7. 对结果做类型转换。
static void handle() {
CompletableFuture<String> f = new CompletableFuture<>();
// 当f完成后会执行handle传入的方法
f.handle(
new BiFunction<String, Throwable, Integer>() {
@Override
public Integer apply(String s, Throwable throwable) {
if (throwable != null) {
return -1;
}
return Integer.valueOf(s);
}
})
.thenAccept(
new Consumer<>() {
@Override
public void accept(Integer integer) {
System.out.println(integer);
}
});
// f.complete("1"); // 输出1
f.completeExceptionally(new RuntimeException()); // 输出-1
}
8. 写个尽量完整的例子,看下各个方法是如何结合在一起使用的。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class DataLoader {
private static final AtomicInteger idGen = new AtomicInteger();
private final ConcurrentMap<Integer, CompletableFuture<byte[]>> futureMap;
private RemoteDataLoader loader;
public DataLoader() {
this.futureMap = new ConcurrentHashMap<>();
}
public void load(int dataID, Consumer<byte[]> dataConsumer, Executor executor) {
int futureID = idGen.incrementAndGet();
CompletableFuture<byte[]> f = new CompletableFuture<>();
// 当future完成后,做些清理工作,然后将数据传给用户
f.whenCompleteAsync(
new BiConsumer<>() {
@Override
public void accept(byte[] data, Throwable throwable) {
futureMap.remove(futureID);
dataConsumer.accept(data);
}
},
executor);
// 3秒内没结果就返回null
f.completeOnTimeout("null".getBytes(), 3, TimeUnit.SECONDS);
// 将future放入map中
futureMap.put(futureID, f);
// 通知remote加载数据并将结果以回调remoteDataLoaded方法的形式返回
loader.load(dataID, futureID);
}
public void remoteDataLoaded(int futureID, byte[] data) {
CompletableFuture<byte[]> f = futureMap.get(futureID);
if (f != null) {
f.complete(data);
}
}
public interface RemoteDataLoader {
void load(int dataID, int futureID);
}
public static void main(String[] args) throws InterruptedException {
DataLoader loader = new DataLoader();
loader.loader =
new RemoteDataLoader() {
@Override
public void load(int dataID, int futureID) {
if (dataID > 0) {
loader.remoteDataLoaded(futureID, "hello".getBytes());
}
// 如果dataID非法,则不返回数据,DataLoader里就会报timeout异常
}
};
// 所有返回的数据都用该Executor执行输出操作
ExecutorService exec = Executors.newCachedThreadPool();
// 正常数据加载,输出hello
loader.load(
1,
new Consumer<>() {
@Override
public void accept(byte[] data) {
System.out.println(new String(data));
}
},
exec);
// 异常数据加载,发生timeout,输出null
loader.load(
0,
new Consumer<>() {
@Override
public void accept(byte[] data) {
System.out.println(new String(data));
}
},
exec);
// 等待timeout发生
Thread.sleep(4000);
// 关闭ExecutorService
exec.shutdown();
}
}
例子中的逻辑不是非常完善,但基本上可以展示CompletableFuture在项目中如何使用,当然,CompletableFuture还有更加复杂和强大的用法,这里就不一一介绍了,感兴趣的朋友可以点击阅读原文,查看其完整的api。
希望对你有所帮助。
完。
本文分享自 Linux内核及JVM底层相关技术研究 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!