首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >RxJava2.X 源码分析(四):观察者线程切换原理

RxJava2.X 源码分析(四):观察者线程切换原理

作者头像
陈宇明
发布2020-12-15 14:55:56
发布2020-12-15 14:55:56
7320
举报
文章被收录于专栏:设计模式设计模式

作者博客

http://www.cherylgood.cn

前言

基于RxJava2.1.1

我们在前面的 RxJava2.0使用详解(一)初步分析了RxJava从创建到执行的流程。RxJava2.0使用详解(二) 中分析了RxJava的随意终止Reactive流的能力的来源;也明白了RxJava的onComplete();与onError(t);只有一个会被执行的秘密。RxJava2.X 源码分析(三)中探索了RxJava2调用subscribeOn切换被观察者线程的原理。

本次我们将继续探索RxJava2.x切换观察者的原理,分析observeOn与subscribeOn的不同之处。继续实现我们在第一篇中定下的小目标

从Demo到原理

OK,我们的Demo还是上次的demo,忘记了的小伙伴可以点击RxJava2.X 源码分析(三),这里就不再重复了哦,我们直接进入正题。

Ok,按照套路,我们从observeOn方法入手。

Ok,我点~^_^

我们继续往下看,我猜套路跟subscribeOn的逃不多,也是采用装饰者模式,wrapper我们的Observable和Observer产生一个中间被观察者和观察中,通过中间被观察者订阅上游被观察者,通过中间观察者接收上游被观察者下发的数据,然后通过线程切换将数据传递给下游观察者。

Ok,我们来验证下才想。我觉得就是没完全猜对,也能猜对其中的大部分。

Ok,熟悉的RxJavaPlugins.onAssemblyhook处理,略过,直接看new ObservableObserveOn(this, scheduler, delayError, bufferSize)这句

Ok,果然,熟悉的模式,对我们上游的Observable,下游的Observerwrapper一次。 1、ObservableObserveOn继承了AbstractObservableWithUpstream 2、source保存上游的Observable 3、scheduler为本次的调度器 4、在下游调用subscribe订阅时触发->subscribeActual->Wrapper了下游的Observer观察者

3处:source为游Observable,下游Observer被wrapper到ObserveOnObserver,发生订阅数件,上游Observable开始执行subscribeActual,调用ObserveOnObserver的onSubscribe以及onNext、onError、onComplete等

OK,我们接着看Observer被包装进 ObserveOnObserver的样子,代码有点多,我们分段讲解

OK,执行玩这里之后,就到我们的onXX方法了

首先可无限调用的onNext

其次只能触发一次的onError,基本差不多

同样是只能触发一次的onComplete,同样的套路,就不说了

然后就是我们的关键点schedule();

什么?传入了this?那么说明什么呢?( ̄∇ ̄)

嗯?this是个runnable,没错,我们的ObserveOnObserver实现了Runnable接口

那么,接下来自然是调用run方法

好吧,在看drainNormal前,我们先看一个函数

true:1、订阅被取消cancelled==true,2、done==true onNext刚被调度完,onError或者onCompele被调用

继续看drainNormal

总结

Ok,看到这里我们基本了解了observeOn的实现流程,同样是老套路,使用装饰者模式,中间Wrapper了我们的Observable和Observer,通过中间增加一个Observable和Observer来实现线程的切换。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-07-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码个蛋 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档