前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Future系列(CompletableFuture与retrofit)使用和解析

Future系列(CompletableFuture与retrofit)使用和解析

作者头像
提莫队长
发布2021-03-11 11:26:50
1.2K0
发布2021-03-11 11:26:50
举报
文章被收录于专栏:刘晓杰

一、在Android中的使用

1.gradle依赖
代码语言:javascript
复制
    implementation "io.reactivex.rxjava2:rxjava:2.0.8"
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    implementation 'com.squareup.retrofit2:retrofit:2.9.0'
    implementation 'com.squareup.retrofit2:converter-gson:2.1.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'

注意,retrofit 2.9.0 已经内置了 java8 的 adapter,所以不需要 adapter-java8 的依赖了 另外,别忘了网络权限

2.定义接口
代码语言:javascript
复制
public interface GitHubService {
    @GET("users/{user}/repos")
    Observable<List<Repo>> listRepos(@Path("user") String user);

    @GET("users/{user}/repos")
    CompletableFuture<List<Repo>> listRepos2(@Path("user") String user);
}

上面的是 rxjava 的用法,下面是 CompletableFuture 的用法。我写在一起是为了比对

3.定义retrofit
代码语言:javascript
复制
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://api.github.com/")
                .addConverterFactory(GsonConverterFactory.create())
                .build();

        GitHubService service = retrofit.create(GitHubService.class);

注意我们这里不需要添加 addCallAdapterFactory 了。因为 2.9.0 的源码里面内置了 CompletableFutureCallAdapterFactory。并且已经默认添加进去了

代码语言:javascript
复制
    List<? extends CallAdapter.Factory> defaultCallAdapterFactories(
            @Nullable Executor callbackExecutor) {
        DefaultCallAdapterFactory executorFactory = new DefaultCallAdapterFactory(callbackExecutor);
        return hasJava8Types
                ? asList(CompletableFutureCallAdapterFactory.INSTANCE, executorFactory)
                : singletonList(executorFactory);
    }

具体代码在 Retrofit.Builder build 的时候 defaultCallAdapterFactories 里面有

4.网络请求
代码语言:javascript
复制
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        CompletableFuture<List<Repo>> future = service.listRepos2("octocat").handle(new BiFunction<List<Repo>, Throwable, List<Repo>>() {
            @Override
            public List<Repo> apply(List<Repo> repos, Throwable throwable) {
                Log.e("handleAsync", Thread.currentThread().getName());//***************(1)
                Log.e("handleAsync", repos.toString());
                return repos;
            }
        });
        try {
            future.get(5, TimeUnit.SECONDS);//这里必须加 try catch***************(2)
            future.thenApply(new Function<List<Repo>, String>() {
                @Override
                public String apply(List<Repo> repos) {
                    Log.e("thenApply2", Thread.currentThread().getName());//前面必须要有get,不然不会输出main
                    return null;
                }
            }).thenApply(new Function<String, String>() {
                @Override
                public String apply(String repos) {
                    Log.e("thenApply2", Thread.currentThread().getName());//**********************(3)
                    return null;
                }
            });
        } catch (Exception e) {
            Log.e("handleAsync", e.toString());
        }

接下来要解释几点 (1)1号注释那里输出啥? listRepos2 返回 BodyCallAdapter。里面就是熟悉的 call.enqueue。如果没有特别指明线程池,那么肯定是 okhttp 里面自带的线程 所以就算在 handleAsync 里面指明线程池也没用,它只是改变了 apply 函数里面的线程。要想用自己设计的线程池必须要 okhttp 里面自己设定 PS:就算自己设定了线程池,输出的线程名字依然没有变是因为 NamedRunnable 改了名字 (2)为啥一定要get 主线程没有特定的线程池,所以只能用get让后面的方法在main上运行 (3)3号注释输出啥 和上一个操作符指定的线程一致。这一点和rxjava很像

二、原理解析

如果看过上一篇的话就会发现一个很奇怪的现象。在Java中 thenApply 会自动切换到主线程,而Android中和上一个操作符指明的线程一致。接下来就来分析一下为什么 (其实java中如果在 supplyAsync 里面添加了 Thread.sleep 也可以得到和 android 一样的结果,原因不明) 其实有了上面的使用基本也就知道了,CompletableFuture 是在主线程通过阻塞方法 get 来获取到子线程中的值的,根本不存在什么 Handler。严格意义上来讲里面没有“切换”的操作。更何况主线程也不可能通过 Executor 来包装。 那么接下来就大致讲解一下整个流程是如何串起来的。先来段示例代码

代码语言:javascript
复制
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                }
                System.out.println("supplyAsync===" + Thread.currentThread().getName());
                return "supplyAsync";
            }
        }).thenApply(new Function<String, String>() {
            @Override
            public String apply(String t) {
                System.out.println("apply===" + Thread.currentThread().getName());
                return "apply";
            }
        });

在讲解之前先介绍一下数据结构吧

代码语言:javascript
复制
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    volatile Completion stack;

    abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
        volatile Completion next;      // Treiber stack link
    }
}

每个 CompletableFuture 内部有一个 stack(只是名字是 stack,其实还是个链表)。stack 内部有一个链表

supplyAsync
代码语言:javascript
复制
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));

AsyncSupply 是个 Runnable,所以看 run 方法

代码语言:javascript
复制
        public void run() {
            CompletableFuture<T> d; Supplier<? extends T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {//没有结果就等待结果
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }

很简单,先计算结果,然后postComplete

代码语言:javascript
复制
    final void postComplete() {
        CompletableFuture<?> f = this; Completion h;
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            if (f.casStack(h, t = h.next)) {// 从头遍历stack,并更新头元素
                if (t != null) {
                    if (f != this) {
                        pushStack(h);// 如果f不是当前CompletableFuture,则将它的头结点压入到当前CompletableFuture的stack中,使树形结构变成链表结构,避免递归层次过深
                        continue;
                    }
                    h.next = null;// 如果是当前CompletableFuture, 解除头节点与栈的联系
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d;//类似于钩子,执行对应 thenApply或者whenComplete的回调
            }
        }
    }

supplyAsync 主要功能就是生成一个新的 CompletableFuture。等待 get 完成并且做点扫尾工作,然后 h.tryFire 下一个 CompletableFuture

thenApply
代码语言:javascript
复制
        CompletableFuture<V> d = newIncompleteFuture();
        if (e != null || !d.uniApply(this, f, null)) {
            UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
            push(c);//把当前的UniApply 放到next中
            c.tryFire(SYNC);//tryFire下一个
        }

看到了吧,也是新生成 CompletableFuture。和 rxjava 类似。

代码语言:javascript
复制
    static final class UniApply<T,V> extends UniCompletion<T,V> {
        Function<? super T,? extends V> fn;
        UniApply(Executor executor, CompletableFuture<V> dep,
                 CompletableFuture<T> src,
                 Function<? super T,? extends V> fn) {
            super(executor, dep, src); this.fn = fn;
        }
        final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d; CompletableFuture<T> a;
            if ((d = dep) == null ||
                !d.uniApply(a = src, fn, mode > 0 ? null : this))//这里面计算thenApply的function
                return null;
            dep = null; src = null; fn = null;
            return d.postFire(a, mode);//最后绕来绕去还是会回到 postComplete
        }
    }

很明显,每一步都生成一个新的 CompletableFuture,然后通过 stack 串起来

PS: supplyAsync 最后提到的 扫尾工作 之所以没有展开来讲是因为我也没看懂dep的意思。stack对应的链式调用到时看明白了。如果想了解详细内容,可以参考如下两篇文章 CompletableFuture原理解析 CompletableFuture 原理浅析

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、在Android中的使用
  • 二、原理解析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档