首页
学习
活动
专区
圈层
工具
发布

RxJava 容易忽视的细节: subscribeOn() 方法没有按照预期地运行

通常情况下,RxJava 发射的数据会在同一个线程上,但是稍作一些变化,发射的数据来自不同的线程会怎样呢?...,这段代码在3个线程上运行。...当我们的 subject 发射第一个值时,第一个观察者已经被订阅。由于订阅代码在我们调用 onNext() 时已经完成,因此订阅调度程序没有任何作用。...在这种情况下,当我们调用 onNext() 它类似于 PublishSubject 的工作方式。 第二和第三个观察者都在初始 onNext() 之后订阅。...这样,将尊重订阅调度程序,并在它提供的线程上通知观察者。 所有后续的发射的值都发生在订阅之后,因此,值再次与 onNext() 在同一线程上发出,类似于 PublishSubject 的工作方式。

2.4K10

RxBus 的初步探索

,我们发现synchronized线程锁,证明当前是线程安全的,当多个线程再要执行onNext,这里线程安全,排队线程会加入queue,然后依次执行。...PublishSubject 与普通的Subject不同,在订阅时并不立即触发订阅事件,而是允许我们在任意时刻手动调用onNext(),onError(),onCompleted来触发事件。...可以看到PublishSubject与普通的Subject最大的不同就是其可以先订阅事件,然后在某一时刻手动调用方法来触发事件。...(result); 我们可以根据我们的业务需求先对Subject进行订阅,然后再默一时刻触发我们的onNext。...原理总结 这里的publishSubject就是在我们发出通知的时候才会去onNext,而我们的onNext是线程安全的,当并发访问的时候,可以依次执行onNext,这里我们要用到ofType这个操作符

1.2K50
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    RxJava 的 Subject

    PublishSubject Observer只接收PublishSubject被订阅之后发送的数据。...("publicSubject3"); subject.onNext("publicSubject4"); 执行结果: publicSubject:complete 因为subject在订阅之前...,都发射全部数据 PublishSubject 发送订阅之后全部数据 可能错过的事件 Subject 作为一个Observable时,可以不停地调用onNext()来发送事件,直到遇到onComplete...因为事件总线是基于发布/订阅模式实现的,如果某一事件在多个Activity/Fragment中被订阅的话,在App的任意地方一旦发布该事件,则多个订阅的地方都能够同时收到这一事件(在这里,订阅事件的Activity...每当用户处于弱网络时,打开一个App可能出现一片空白或者一直在loading,那用户一定会很烦躁。此时,如果能够预先加载一些数据,例如上一次打开App时保存的数据,这样不至于会损伤App的用户体验。

    1.6K20

    RxSwift介绍(三)——更加灵活的Subject

    在 RxSwift 框架中,提供了四种类型的 subject,首先要了解的一点就是提供的四种 subject 创建方式最主要的区别:当一个新的订阅者订阅到subject对象时,能否收到 subject...PublishSubject 最普通的 subject ,不需要初始值就可以创建,而且从订阅者开始订阅的时间点起,可以收到 subject 发出的新 event,而不会收到在订阅前已发出的 event...因此,在使用时必须在创建时设置 bufferSize,表示将会返回给订阅者对应个数最近缓存的旧 event (注:若一个订阅者去订阅已经结束的 ReplaySubject ,除了会收到缓存的 .next...与 BehaviorSubject 不同的是,Variable还会把当前发出的值保存为自己的状态,同时在销毁时自动发送 .completed event,不需要也不能手动给 Variable 发送终结事件...换个方式理解,Variable 有一个 value 属性,当改变 value 属性的值时就相当于调用一般 Subjects 的 onNext() 方法,而这个最新的 onNext() 的值就被保存在 value

    2K30

    Android 中 RxJava 的使用

    前言 Android原生的多线程和异步处理简直糟透了,反复的嵌套让代码看起来十分不明了,多线程上也没有iOS的dispatch好用,但是用了Rxjava后就会有所改善,虽然代码量看起来会多一点,但是逻辑就清晰多了...) 中转站(Subject) 线程(Scheduler) 操作符 ---- 形象的来说 发布者 就相当于 报社 订阅者 就相当于 用户 中转站 就相当于 报亭 它既是订阅者 又是发布者 线程 是指定在哪个线程上处理...操作符 则是把发布者的数据进行处理,再给订阅者 ---- 在发布者和订阅者之间传递的事件总共有三种 onNext(): 发送事件的数据 onCompleted(): 事件队列完结。...在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。...observable.subscribe(observer); 注意上面方法的顺序 看上去是发布者订阅了订阅者,之所以这样是因为链式代码的优雅 线程(Scheduler) 常用的方式是分线程中处理数据

    2.5K30

    Android RxJava+Retrofit完美封装(缓存,请求,生命周期管理)

    因为我们在每一个请求中都会处理code以及一些重用一些操作符,比如用observeOn和subscribeOn来切换线程。...然而 onStart()由于在 subscribe()发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe()被调用时的线程。所以onStart并不能保证永远在主线程运行。...千万不要小看了RxJava,与 onStart()相对应的有一个方法 doOnSubscribe(),它和 onStart()同样是在subscribe()调用后而且在事件发送前执行,但区别在于它可以指定线程...public class HttpUtil{ /** * 构造方法私有 */ private HttpUtil() { } /** * 在访问HttpUtil时创建单例 */ private.../** * 获取单例 */ public static HttpUtil getInstance() { return SingletonHolder.INSTANCE; } //添加线程管理并订阅

    3.8K11

    RxSwift-Subject即攻也守

    ,只是subject 把订阅流程和响应流程都内部实现,所以也就没有必要引入sink 各种Subject PublishSubject 可以不需要初始来进行初始化(也就是可以为空),并且它只会向订阅者发送在订阅之后才接收到的元素...// PublishSubject // 1:初始化序列 let publishSub = PublishSubject() //初始化一个PublishSubject 装着Int类型的序列...通过一个默认初始值来创建,当订阅者订阅BehaviorSubject时,会收到订阅后Subject上一个发出的Event,如果还没有收到任何数据,会发出一个默认值。...和publish 稍微不同就是behavior这个家伙有个存储功能:存储上一次的信号 // BehaviorSubject // 1:创建序列 let behaviorSub = BehaviorSubject.init...("订阅到了:",$0)} .disposed(by: disposbag) // 再次发送 behaviorSub.onNext(4) behaviorSub.onNext(5) // 再次订阅

    69110

    干货| 是时候对RxLifecycle来篇详解了

    随着Android第三库的普及,RxJava和RxAndroid 越来越被人熟知,简洁的语法,配合Java8 Lambda表达式,使代码的结构更加清晰,通过线程调度器更容易控制和切换线程,种种优点,使用它的人也越来越多...,因为是在onStart的时候调用,所以在onStop的时候自动取消订阅 .compose(this....在前两步一般都是不会出现问题的,但是在第三步,当数据返回给client端时,如果页面已经不在了,那么就无法去绘制UI,很有可能会导致意向不到的问题。...one"); subject.onNext("two"); subject.onCompleted(); 这里做的事情很简单,先创建一个PublishSubject -> 绑定一个myObserver...BaseActivity监听生命周期 那么我们先来实现生命周期监听功能,基本思路是:在BaseActivity里创建一 个PublishSubject对象,在每个生命周期发生时,把该生命周期事件传递给PublishSubject

    1.9K20

    项目需求讨论 — 手把手带你写RxPermission

    产品经理针对答复做出相应处理(订阅Observable) 我们可以看到,我们已经可以把第一次情况反馈给了产品经理,但这里可以有二种情况发生(主要看你跟产品经理关系铁不铁): ?...手机APP向系统提权限 没错,基本一模一样,唯一不同的地方就是图中蓝色那一块: 产品经理提需求 手机APP提权限 脑子想了一会,调用了onNext和onComplete回复。...跳出弹框,让用户点击,然后再调用onNext和onComplete回复。...中会针对这个列表里面的每个权限,去调用系统方法进行询问, 然后根据不同的结果去onNext和onComplete通知。...还记不记得我们跟产品经理的关系铁与不铁,还有不同的处理,就是都成功还算任务完成,或者分批告诉你每个需求的评估结果。

    79720

    RxJava基础操作符和高级操作符

    •onNext(Tvalue):接收到一个数据项时调用。•onError(Throwablee):发生错误时调用,流终止。•onComplete():流成功完成时调用,流终止。...•observeOn(Schedulerscheduler):指定下游(onNext,onError,onComplete)在哪个线程执行。可以多次调用,切换线程。...每个操作符(如map,filter)内部通常会调用lift(Operator)方法,创建一个新的Observable,这个新的Observable会包装上游的Observable,并在订阅时,将下游的Observer...•PublishSubject:向所有订阅者广播数据,订阅后才能收到数据。•ReplaySubject:缓存所有发射过的数据,新的订阅者会收到所有历史数据。...•BehaviorSubject:缓存最后一个数据,新的订阅者会立即收到这个最新数据,然后接收后续数据。•AsyncSubject:只在流完成时,向所有订阅者发射最后一个数据。

    12110

    【iOS】RxSwift官方Example3--地理位置监听

    图一是当App可以使用定位信息时,显示当前的经纬度。 图二是当App被禁止使用定位信息时,显示的提示信息 代码解释 比起上两个Example,这个Example复杂的多了。...当想绑定的在视图信息越多,我们就需要对UILabel进行扩展。...PublishSubject的概念 当你订阅PublishSubject的时候,你只能接收到订阅他之后发生的事件 因此为了能够成为代理的代理,我们需要监听代理的事件,并且能够让外部进行监听,所以我们创建了以下两个...lazy var didFailWithErrorSubject = PublishSubject() 将代理事件通过subject传递出去,记得调用_forwardToDelagate...因此,每次订阅authorized信息时,都会发送独立的序列,确保每次都会响应。

    1.4K20

    Android消息总线的演进之路:用LiveDataBus替代RxBus、EventBus

    响应式编程(Reactive Programming)技术这几年特别火,RxJava是它在Java上的实作。RxJava天生就是发布/订阅模式,而且很容易处理线程切换。...RxBus原理 在RxJava中有个Subject类,它继承Observable类,同时实现了Observer接口,因此Subject可以同时担当订阅者和被订阅者的角色,我们使用Subject的子类PublishSubject...来创建一个Subject对象(PublishSubject只有被订阅后才会把接收到的事件立刻发送给订阅者),在需要接收事件的地方,订阅该Subject对象,之后如果Subject对象接收到事件,则会发射给该订阅者...避免内存泄漏,观察者被绑定到组件的生命周期上,当被绑定的组件销毁(destroy)时,观察者会立刻自动清理自身的数据。...消息总线 消息总线通过单例实现,不同的消息通道存放在一个HashMap中。 订阅 订阅者通过getChannel获取消息通道,然后调用observe订阅这个通道的消息。

    2.8K30

    RxJava && Agera 从源码简要分析基本调用流程(2)

    对于我们而言,最常见的莫过于在非主线程获取并处理数据之后在主线程更新UI这样的场景了: [image.jpg] 这是我们十分常见的调用方法,一气呵成就把不同线程之间的处理都搞定了,因为是链式所以结构也很清晰...现在再结合之前的过程我们从头梳理一下: [image.jpg] 在subscribeOn()时,我们会新生成一个Observable,它的成员onSubscribe会在目标Subscriber订阅时使用传入的...之后,当我们调用subject.onNext()时,消息才被发送,Observer的onNext()被触发调用,输出了"Hello World"。...这里我们注意到,当订阅事件发生时,我们的subject是没有产生数据流的,直到它发射了"Hello World",数据流才开始运转,试想我们如果将订阅过程和subject.onNext()调换一下位置,...此时,我们可以在结束前按需要选择对数据流进行最后的配置,例如:调用onDeactivation()配置从“订阅”到“取消订阅”的过程是否需要继续执行数据流等等。

    10.7K10
    领券