RxJava给我们提供了很多变换的操作符,map、flatMap就是比较常用的操作符,一般我们使用的时候,都是看官方文档来了解每个操作符的含义,但是我自己感觉下来,看官方文档使用没问题,但是总有一点隔靴搔痒的意思,所以我还要去RxJava的源码一探究竟,做到心中有数。
我们先从相对简单的 Map 开始
官方定义:transform the items emitted by an Observable by applying a function to each item 拙劣的翻译:应用一个函数 转换所有的被发射的item
官方的图解:
map 图例.png
到这里我们总结一下:
这里抛出一个问题,map 调用我们提供的function进行转换,那么这个function在什么时候被调用?在哪个线程被调用?(这个对我们实际工程中使用map有意义,知道代码被执行的线程是必须的)
废话不多说,进入源码
Observable类是RxJava的门面,基本上所有的转换符都在这里定义,直接看Map 的方法定义
map 方法.png
可以看到,Function类,泛型有2个参数,第一个是原数据类型,第二个是转换后的数据类型,最终返回的是ObservableMap 类(RxJava的类命名很规范,如果是Observable类型的就是Observable开头 + 具体的操作符名称,如果是Observer类型的 就是 具体的操作符名称 + Observer结尾)我们进入ObservableMap类,Observable类之前的文章有提到过,subscribeActual 是个重要的钩子方法,所以我们直接看ObservableMap如何重写该方法的
ObservaleMap.png
方法代码就一行,调用装饰的Observable的subscribe方法,传递一个MapObserver对象,Observer类我们就比较熟悉了,我们这里主要看onNext方法
map onnext.png
代码也很简单,红框标识的就是 mapper 转换函数被调用的地方,得到转换后的对象v,传递给被装饰的Observer 的onNext方法,到这里,一次数据的map转换就结束了。源码的实现还是很简单的,在我们了解了源码的实现后,思路会更清晰,写代码时也会更有把握。
现在我们来解答前面我们抛出的问题,Function在什么时候被调用?在哪个线程被调用? Function调用的地方已经清楚了,在ObserverMap 的 onNext方法中,那么调用的线程呢,因为是在Observer方法中被调用,所以如果在map 之前 调用了 ObserverOn 方法设置监听线程,那么就在该监听线程,如果没有设置 ObserverOn 但是设置了 SubscribeOn方法设置发射线程,那么就在该 发射线程,如果SubscribeOn也没有设置,那就在Observable的创建线程。
到此Map 就介绍完了,接下来是Map 的好兄弟 FlatMap,调用逻辑稍微复杂一点点,看官们耐心 -。-
官方定义:transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable 拙劣的翻译:应用一个函数 转换所有的被发射的item从一个Observable转成为多个Observable,并将所有要发射的数据平铺为一个Observable
官方的图解:
flatmap 图例.png
到这里我们总结一下:
这里抛出一个问题,flatMap会将原来的Observable,转换为多个Observable来发射数据,那么这些发射的数据是否会严格按顺序发射然后被Observer接收?
问题先留在这里,进入源码
FlatMap操作符涉及的代码会相对多一些,但是也是有规律可循。 同样到Observable 类中看 flatMap的定义,源码作者为了方便开发者调用,提供了多个方法重载,我们最常用的方法定义如下
flatmap 方法1.png
最终调用的方法是
flatmap 方法.png
跟map 的套路 差不多,我们直接进入 ObservableFlatMap类, 我们还是看它的 subscribeActual 方法实现
ObservableFlatMap.png
可以看到,它给原Observer 装饰后的 Observer 是 MergeObserver,我们再继续看 MergeObserver 的 onNext 方法
MergeObserver onnext.png
由于我们默认调用的flatmap 的 maxConcurrency 大小是 Integer.MAX_VALUE, 所以最终会调用 subscribeInner(p),注意这里我们的mapper方法以及被调用了,p就是跟我们传入的Function生成的Observable,我们再继续往下看
MergeObserver subscribeInner.png
一般我们传入的Function 生成的Observable 都不是 Callable类型的,所以最终传给Observable p 的 是InnerObserver, 找到了最终元凶,直接去看它的onNext方法实现吧。
MergeObserver onnext.png
funsionMode 默认是 None,走第一个if 逻辑,最终调用的是 上面的MergeObserable 的 tryEmit 方法,继续进去看
MergeObservable tryEmit.png
这里要插一句,MergeObserver 继承了 AtomicInteger,所以这里的tryEmit方法就利用了 AtomicInteger 的同步机制,所以同时只会有一个 value 被 actual Observer 发射,而且这里 刚好 可以解答我们上面留下的 问题,由于 AtomicInteger CAS锁只能保证操作的原子性,并不保证锁的获取顺序,是抢占式的,所以最终数据的发射顺序并不是固定的(同一个Observable发出的数据是有序的)
如果没有获取到锁,就会将要发射的数据放入 队列中,drainLoop 方法会循环去获取队列中的 数据,然后发射,由于篇幅有限,更详细的调用过程大家可以看源码。
dramloop.png
Map 和 FlatMap 二个操作符的 源码就解析到这里,水平有限,有不对的,还望大佬不吝赐教。