前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava2.X 源码解析(二) :神秘的取消订阅流程

RxJava2.X 源码解析(二) :神秘的取消订阅流程

作者头像
陈宇明
发布2020-12-15 14:54:50
7880
发布2020-12-15 14:54:50
举报
文章被收录于专栏:设计模式

作者博客

http://www.cherylgood.cn/

前言

基于RxJava2.1.1

我们在前一篇# RxJava2.0源码解析(一)初步分析了RxJava从创建到执行的流程

本篇我们将探索RxJava2.x提供给我们的Disposable能力的来源。

要相信,任何神奇的功能,当你探索了其本质之后,收获都是巨大的。

从Demo到原理

( ̄∇ ̄)猜猜会输出什么呢?

在发送玩hello之后,成功终止了后面的Reactive流。从结果我们还发现,后面的Reactive流被终止了,也就是订阅者或者观察者收不到后面的信息了,但是生产者或者说被订阅者、被观察者的代码还是会继续执行的。

Ok,我们从哪开始入手呢?我们发现,在我们执行了 disposable.dispose();后,触发了该事件,我们看下 disposable.dispose();到底做了什么呢,很开心的,我们点进 disposable.dispose();的源码,╮(╯_╰)╭,好吧,只是接口

此时我们要回忆一下上一篇的一段代码

我们之前分析到在执行source.subscribe(parent);触发数据分发事件之前先执行了observer.onSubscribe(parent);这句代码,所传入的parent也就对应了我们的Disposable

parent是CreateEmitter类型的,但是CreateEmitter是实现了Disposable接口的一个类。而parent又是我们的observer的一个包装后的对象。

OK,分析到这里我们来整理下前面的环节,根据Demo来解释下:首先在执行下面代码之前

先执行了observer.onSubscribe(parent);,我们在demo中也是通过传入的parent调用其dispose方法来终止Reactive流,而执行分发hello等数据的e也是我们的parent,也就是他们都是同一个对象。而执行e.onNext("hello");的e对象也是observer的一个包装后的ObservableEmitter类型的对象。

总结:Observer自己来控制了Reactive流状态。

Ok,此时如果我说关键点应该在ObservableEmitter这个类上面,你觉得可能性有多少呢?( ̄∇ ̄)

关键点就是CreateEmitter<T> parent = new CreateEmitter<T>(observer);这个包装的过程,我们来看下其源码

因为其实现了ObservableEmitter<T>, Disposable接口类,所以需实现其方法。这里其实是使用了装饰者模式,其魅力所在一会就会看到了。

我们可以看到在ObservableEmitter内部通过final Observersuper T> observer;存储了我们的observer,这样有什么用呢?看Demo,我们在调用e.onNext("hello");时,调用的时ObservableEmitter对象的onNext方法,然后ObservableEmitter对象的onNext方法内部再通过observer调用onNext方法,但是从源码我们可以发现,其并不是简单的调用哦。

1、先判断传入的数据是否为null

2、判断isDisposed(),如果isDisposed()返回false则不执行onNext。

isDisposed()什么时候会返回false呢?按照demo,也就是我们调用了disposable.dispose();后,disposable前面分析了就是CreateEmitter<T> parent,我们看CreateEmitter.dispose()

里面调用DisposableHelper.dispose(this);,我们看isDisposed()

RxJava的onComplete();与onError(t);只有一个会被执行的秘密原来是它?

再看另外两个方法的调用

其内部也基本做了同样的操作,先判断!isDisposed()后再决定是否执行。

但是再这里还有一点哦,我们应该知道onComplete();和onError(t)只有一个会发生,其实现原理也是通过isDisposed这个方法哦,我们可以看到,不关是先执行onComplete();还是先执行onError(t),最终都会调用dispose();,而调用了dispose();后,isDisposed()为false,也就不会再执行另外一个了。而且如果人为先调用onError再调用onComplete,onComplete不会被触发,而且会抛出NullPointerException异常。

小结:

此时我们的目的基本达到了,我们知道了Reactive流是如何被终止的以及RxJava的onComplete();与onError(t);只有一个会被执行的原因。

我们虽然知道了原因,但是秉着刨根问底的态度,抵挡不住内心的好奇,我还是决定挖一挖DisposableHelper这个类,当然如果不想了解DisposableHelper的话,看到这里也就可以了;

Ok,前面分析到,代码里调用了DisposableHelper类的静态方法,我们看下其调用的两个静态方法分别做了什么?

1、DISPOSED:作为是否要终止的枚举类型的标识

2、isDisposed:判断上次记录的终点标识的是否是 当前执行的Observer,如果是返回true

3、dispose:采用了原子性引用类AtomicReference,目的是防止多线程操作出现的错误。

更详细的分析放入了代码中

总结

通过本次,1、我们了解了RxJava的随意终止Reactive流的能力的来源;2、过程中也明白了RxJava的onComplete();与onError(t);只有一个会被执行的秘密。

实现该能力的主要方式还是利用了装饰者模式

从中体会了设计模式的魅力所在,当然我们还接触了AtomicReference这个类,在平时估计很少接触到。

后续会继续分析RxJava的各种魔力点。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码个蛋 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档