implementation "io.reactivex.rxjava2:rxjava:2.0.8"
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.squareup.retrofit2:retrofit:2.9.0'
implementation 'com.squareup.retrofit2:converter-gson:2.1.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
注意,retrofit 2.9.0 已经内置了 java8 的 adapter,所以不需要 adapter-java8 的依赖了 另外,别忘了网络权限
public interface GitHubService {
@GET("users/{user}/repos")
Observable<List<Repo>> listRepos(@Path("user") String user);
@GET("users/{user}/repos")
CompletableFuture<List<Repo>> listRepos2(@Path("user") String user);
}
上面的是 rxjava 的用法,下面是 CompletableFuture 的用法。我写在一起是为了比对
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.github.com/")
.addConverterFactory(GsonConverterFactory.create())
.build();
GitHubService service = retrofit.create(GitHubService.class);
注意我们这里不需要添加 addCallAdapterFactory 了。因为 2.9.0 的源码里面内置了 CompletableFutureCallAdapterFactory。并且已经默认添加进去了
List<? extends CallAdapter.Factory> defaultCallAdapterFactories(
@Nullable Executor callbackExecutor) {
DefaultCallAdapterFactory executorFactory = new DefaultCallAdapterFactory(callbackExecutor);
return hasJava8Types
? asList(CompletableFutureCallAdapterFactory.INSTANCE, executorFactory)
: singletonList(executorFactory);
}
具体代码在 Retrofit.Builder build 的时候 defaultCallAdapterFactories 里面有
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<List<Repo>> future = service.listRepos2("octocat").handle(new BiFunction<List<Repo>, Throwable, List<Repo>>() {
@Override
public List<Repo> apply(List<Repo> repos, Throwable throwable) {
Log.e("handleAsync", Thread.currentThread().getName());//***************(1)
Log.e("handleAsync", repos.toString());
return repos;
}
});
try {
future.get(5, TimeUnit.SECONDS);//这里必须加 try catch***************(2)
future.thenApply(new Function<List<Repo>, String>() {
@Override
public String apply(List<Repo> repos) {
Log.e("thenApply2", Thread.currentThread().getName());//前面必须要有get,不然不会输出main
return null;
}
}).thenApply(new Function<String, String>() {
@Override
public String apply(String repos) {
Log.e("thenApply2", Thread.currentThread().getName());//**********************(3)
return null;
}
});
} catch (Exception e) {
Log.e("handleAsync", e.toString());
}
接下来要解释几点 (1)1号注释那里输出啥? listRepos2 返回 BodyCallAdapter。里面就是熟悉的 call.enqueue。如果没有特别指明线程池,那么肯定是 okhttp 里面自带的线程 所以就算在 handleAsync 里面指明线程池也没用,它只是改变了 apply 函数里面的线程。要想用自己设计的线程池必须要 okhttp 里面自己设定 PS:就算自己设定了线程池,输出的线程名字依然没有变是因为 NamedRunnable 改了名字 (2)为啥一定要get 主线程没有特定的线程池,所以只能用get让后面的方法在main上运行 (3)3号注释输出啥 和上一个操作符指定的线程一致。这一点和rxjava很像
如果看过上一篇的话就会发现一个很奇怪的现象。在Java中 thenApply 会自动切换到主线程,而Android中和上一个操作符指明的线程一致。接下来就来分析一下为什么 (其实java中如果在 supplyAsync 里面添加了 Thread.sleep 也可以得到和 android 一样的结果,原因不明) 其实有了上面的使用基本也就知道了,CompletableFuture 是在主线程通过阻塞方法 get 来获取到子线程中的值的,根本不存在什么 Handler。严格意义上来讲里面没有“切换”的操作。更何况主线程也不可能通过 Executor 来包装。 那么接下来就大致讲解一下整个流程是如何串起来的。先来段示例代码
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
System.out.println("supplyAsync===" + Thread.currentThread().getName());
return "supplyAsync";
}
}).thenApply(new Function<String, String>() {
@Override
public String apply(String t) {
System.out.println("apply===" + Thread.currentThread().getName());
return "apply";
}
});
在讲解之前先介绍一下数据结构吧
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
volatile Completion stack;
abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // Treiber stack link
}
}
每个 CompletableFuture 内部有一个 stack(只是名字是 stack,其实还是个链表)。stack 内部有一个链表
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
AsyncSupply 是个 Runnable,所以看 run 方法
public void run() {
CompletableFuture<T> d; Supplier<? extends T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {//没有结果就等待结果
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
很简单,先计算结果,然后postComplete
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {// 从头遍历stack,并更新头元素
if (t != null) {
if (f != this) {
pushStack(h);// 如果f不是当前CompletableFuture,则将它的头结点压入到当前CompletableFuture的stack中,使树形结构变成链表结构,避免递归层次过深
continue;
}
h.next = null;// 如果是当前CompletableFuture, 解除头节点与栈的联系
}
f = (d = h.tryFire(NESTED)) == null ? this : d;//类似于钩子,执行对应 thenApply或者whenComplete的回调
}
}
}
supplyAsync 主要功能就是生成一个新的 CompletableFuture。等待 get 完成并且做点扫尾工作,然后 h.tryFire 下一个 CompletableFuture
CompletableFuture<V> d = newIncompleteFuture();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);//把当前的UniApply 放到next中
c.tryFire(SYNC);//tryFire下一个
}
看到了吧,也是新生成 CompletableFuture。和 rxjava 类似。
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))//这里面计算thenApply的function
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);//最后绕来绕去还是会回到 postComplete
}
}
很明显,每一步都生成一个新的 CompletableFuture,然后通过 stack 串起来
PS: supplyAsync 最后提到的 扫尾工作 之所以没有展开来讲是因为我也没看懂dep的意思。stack对应的链式调用到时看明白了。如果想了解详细内容,可以参考如下两篇文章 CompletableFuture原理解析 CompletableFuture 原理浅析