作者博客
http://www.cherylgood.cn/
前言
基于RxJava2.1.1
我们在前一篇# RxJava2.0源码解析(一)初步分析了RxJava从创建到执行的流程。
本篇我们将探索RxJava2.x提供给我们的Disposable能力的来源。
要相信,任何神奇的功能,当你探索了其本质之后,收获都是巨大的。
从Demo到原理
( ̄∇ ̄)猜猜会输出什么呢?
在发送玩hello之后,成功终止了后面的Reactive流。从结果我们还发现,后面的Reactive流被终止了,也就是订阅者或者观察者收不到后面的信息了,但是生产者或者说被订阅者、被观察者的代码还是会继续执行的。
Ok,我们从哪开始入手呢?我们发现,在我们执行了 disposable.dispose();后,触发了该事件,我们看下 disposable.dispose();到底做了什么呢,很开心的,我们点进 disposable.dispose();的源码,╮(╯_╰)╭,好吧,只是接口
此时我们要回忆一下上一篇的一段代码
我们之前分析到在执行source.subscribe(parent);触发数据分发事件之前先执行了observer.onSubscribe(parent);这句代码,所传入的parent也就对应了我们的Disposable
parent是CreateEmitter类型的,但是CreateEmitter是实现了Disposable接口的一个类。而parent又是我们的observer的一个包装后的对象。
OK,分析到这里我们来整理下前面的环节,根据Demo来解释下:首先在执行下面代码之前
先执行了observer.onSubscribe(parent);,我们在demo中也是通过传入的parent调用其dispose方法来终止Reactive流,而执行分发hello等数据的e也是我们的parent,也就是他们都是同一个对象。而执行e.onNext("hello");的e对象也是observer的一个包装后的ObservableEmitter类型的对象。
总结:Observer自己来控制了Reactive流状态。
Ok,此时如果我说关键点应该在ObservableEmitter这个类上面,你觉得可能性有多少呢?( ̄∇ ̄)
关键点就是CreateEmitter<T> parent = new CreateEmitter<T>(observer);这个包装的过程,我们来看下其源码
因为其实现了ObservableEmitter<T>, Disposable接口类,所以需实现其方法。这里其实是使用了装饰者模式,其魅力所在一会就会看到了。
我们可以看到在ObservableEmitter内部通过final Observersuper T> observer;存储了我们的observer,这样有什么用呢?看Demo,我们在调用e.onNext("hello");时,调用的时ObservableEmitter对象的onNext方法,然后ObservableEmitter对象的onNext方法内部再通过observer调用onNext方法,但是从源码我们可以发现,其并不是简单的调用哦。
1、先判断传入的数据是否为null
2、判断isDisposed(),如果isDisposed()返回false则不执行onNext。
isDisposed()什么时候会返回false呢?按照demo,也就是我们调用了disposable.dispose();后,disposable前面分析了就是CreateEmitter<T> parent,我们看CreateEmitter.dispose()
里面调用DisposableHelper.dispose(this);,我们看isDisposed()
RxJava的onComplete();与onError(t);只有一个会被执行的秘密原来是它?
再看另外两个方法的调用
其内部也基本做了同样的操作,先判断!isDisposed()后再决定是否执行。
但是再这里还有一点哦,我们应该知道onComplete();和onError(t)只有一个会发生,其实现原理也是通过isDisposed这个方法哦,我们可以看到,不关是先执行onComplete();还是先执行onError(t),最终都会调用dispose();,而调用了dispose();后,isDisposed()为false,也就不会再执行另外一个了。而且如果人为先调用onError再调用onComplete,onComplete不会被触发,而且会抛出NullPointerException异常。
小结:
此时我们的目的基本达到了,我们知道了Reactive流是如何被终止的以及RxJava的onComplete();与onError(t);只有一个会被执行的原因。
我们虽然知道了原因,但是秉着刨根问底的态度,抵挡不住内心的好奇,我还是决定挖一挖DisposableHelper这个类,当然如果不想了解DisposableHelper的话,看到这里也就可以了;
Ok,前面分析到,代码里调用了DisposableHelper类的静态方法,我们看下其调用的两个静态方法分别做了什么?
1、DISPOSED:作为是否要终止的枚举类型的标识
2、isDisposed:判断上次记录的终点标识的是否是 当前执行的Observer,如果是返回true
3、dispose:采用了原子性引用类AtomicReference,目的是防止多线程操作出现的错误。
更详细的分析放入了代码中
总结
通过本次,1、我们了解了RxJava的随意终止Reactive流的能力的来源;2、过程中也明白了RxJava的onComplete();与onError(t);只有一个会被执行的秘密。
实现该能力的主要方式还是利用了装饰者模式
从中体会了设计模式的魅力所在,当然我们还接触了AtomicReference这个类,在平时估计很少接触到。
后续会继续分析RxJava的各种魔力点。