上一篇文章中我们学习了RxJava2中 FlatMap 的原理,同时知道,FlatMap经过转换后发射的数据不是严格有序的,如果需要数据按顺序被发射,RxJava2提供了另外一个操作符, 也是这篇文章的主角 — ConcatMap.
之前分析了FlatMap发射数据无序的原因,但是没有实际用代码验证过,这里我们在分析ConcatMap源码之前,我们先运行测试代码,有个直观的感受。
示例:原始发射的数据是1,2,3,4,5,经过转换后10,100,1000,20,200...50,500,5000
List<String> resultList = new CopyOnWriteArrayList<>();
Observable.create((ObservableOnSubscribe<String>) emitter -> {
for (int i = 1; i < 6; i++) {
emitter.onNext(String.valueOf(i));
}
emitter.onComplete();
}).flatMap(s -> {
String tail = "";
ArrayList<String> newMap = new ArrayList<>();
for (int i = 0; i < 3; i++) {
newMap.add(s + tail);
tail = tail + "0";
}
return Observable.fromIterable(newMap);
}).doOnComplete(() -> {
L.d(resultList.toString());
}).subscribe(resultList::add);
直接看运行结果
flatmap_运行结果.png
看到结果,可能会有疑问,这里不是有序的吗,那不跟我们上面的结论矛盾吗?其实不矛盾,这里之所以有序是因为我们这里的数据转换只是简单的字符串拼接,执行速度非常快,才会产生FlatMap按顺序发射数据的假象,我们将FlatMap的转换加上50 ms 的delay,再运行一下。
List<String> resultList = new CopyOnWriteArrayList<>();
Observable.create((ObservableOnSubscribe<String>) emitter -> {
for (int i = 1; i < 6; i++) {
emitter.onNext(String.valueOf(i));
}
emitter.onComplete();
}).flatMap(s -> {
String tail = "";
ArrayList<String> newMap = new ArrayList<>();
for (int i = 0; i < 3; i++) {
newMap.add(s + tail);
tail = tail + "0";
}
return Observable.fromIterable(newMap).delay(50, TimeUnit.MILLISECONDS);
}).doOnComplete(() -> {
L.d(resultList.toString());
}).subscribe(resultList::add);
flatmap_delay 运行结果.png
可以看到,这一次数据的发射是无序的,验证了我们之前的结论(提一下,虽然1,2,3,4,5的顺序是乱的,但是1,100,1000 它们的顺序不会改变的,因为它们是同一个Observable发射的)
我们将上面的操作符flatmap 改成 concatMap 再看运行打印结果
List<String> resultList = new CopyOnWriteArrayList<>();
Observable.create((ObservableOnSubscribe<String>) emitter -> {
for (int i = 1; i < 6; i++) {
emitter.onNext(String.valueOf(i));
}
emitter.onComplete();
}).concatMap(s -> {
String tail = "";
ArrayList<String> newMap = new ArrayList<>();
for (int i = 0; i < 3; i++) {
newMap.add(s + tail);
tail = tail + "0";
}
return Observable.fromIterable(newMap).delay(50, TimeUnit.MILLISECONDS);
}).doOnComplete(() -> {
L.d(resultList.toString());
}).subscribe(resultList::add);
concatmap_运行结果.png
可以看到,虽然我们加上了50ms 的发射延时,数据仍然是完全按顺序发射的。废话不多说,直接看ConcatMap源码。
嗯,先看方法定义,熟悉的味道
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return concatMap(mapper, 2);
}
接着往下看
最终生成的是ObservableConcatMap(看名字就知道是个Observable)
ObservableConcatMap.png
我们还是看ObservableConcatMap 的 subscribeActual 方法
subscribeActual.png
我们传入的delayErrors 默认是 IMMEDIATE,所以生成的 是 SourceObserver对象,找到了最终的Observer,我们先看它的onNext方法实现
SourceObserver onNext.png
可以看到,新建了一个队列对象 queue,先将要发射的数据放入队列中,接下来重点看drain方法 (drain,英文渣渣特意查了字典,排水管,很形象是不是~~~)
drain.png
SourceObserver 同样继承了AtomicInteger,getAndIncrement方法保证自增的原子性,所以这里只有初始值为0时,执行下面的循环,进入循环,做了2个判断,一个是判断,是否已经disposed,如果是 清空队列并退出循环,还有个active字段,表示当前是否还有Observable在发射,(比如我们上面的例子,原始发射了5个数据,1,2,3,4,5,所以经过ConcatMap转换就有了5个 Observable,每个Observable各自携带3个数据,所以在这里如果用ConcatMap 操作符,这5个Observable都是严格排序发射,只有上一个发射完全完成之后,才会开始下一个,而且因为所有要发射的数据在之前已经加入到queue队列中,所以不曾在竞争,这样也就保证了数据发射的顺序)
调用我们提供的mapper,生成Observable,调用subscribe方法,传递的是InnerObserver
subscribe inner.png
重点看InnerObserver 的 onComplete 方法
inner oncomplete.png
里面最终调用了SourceObservable 的 innerComplete方法
parent inner compelte.png
我们看到在这里 将 active 设为false,同时调用了 drain 方法,循环获取队列中的数据,然后发射。这样,整个流程就清楚了,不是很难,但是读了源码,用的时候,会更有自信。以上,水平有限,看官们开心-v-