首页
学习
活动
专区
圈层
工具
发布

RxJava 容易忽视的细节: subscribeOn() 方法没有按照预期地运行

Hot Observable 对 subscribeOn() 调用造成的影响 2.1 特殊的创建操作符 just just 是一个比较“特殊”的创建操作符,just 的作用是将单个数据转换为发射这个单个数据的...我们不难发现,上述执行结果中 just 操作符创建的 Observable 即使被订阅多次,所产生的值依然保持不变。...该值是从 Observable 外部生成的,而 Observable 仅将其存储以供以后使用。 另外,使用 just 操作符时,不需要 subscribe 订阅也会立即执行。...: from just generating Integer 上述代码,没有进行订阅也执行了打印“generating Integer”,而 Cold Observable 必须使用 subscribe...: Current Thread Name:RxComputationThreadPool-1, Consume: 0 因此,执行的结果运行在 computation() 线程上也不奇怪。

2.4K10

RxJs简介

Observables 也是如此,如果你不“调用”它(使用 subscribe),console.log(‘Hello’) 也不会执行。...这表明 subscribe 调用在同一 Observable 的多个观察者之间是不共享的。...在 Observable 执行中, 可能会发送零个到无穷多个 “Next” 通知。如果发送的是 “Error” 或 “Complete” 通知的话,那么之后不会再发送任何通知了。...调度器的时钟不需要与实际的挂钟时间有任何关系。这也就是为什么像 delay 这样的时间操作符不是在实际时间上操作的,而是取决于调度器的时钟时间。...调度器 目的 null 不传递任何调度器的话,会以同步递归的方式发送通知,用于定时操作或尾递归操作。 Rx.Scheduler.queue 当前事件帧中的队列调度(蹦床调度器),用于迭代操作。

4.8K10
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    Rxjava源码解析笔记 | 创建Observable 与 ObserverSubscriber 以及之间订阅实现的源码分析

    简单回顾 如果抛开Rxjava的操作符以及其线程控制的话,Rxjava的最基本使用是比较简单的 第一步,创建被观察者Observable; 第二步,创建观察者Observer/Subscriber...; 一句话总结一下, Observable的就是通过代理类对象hook创建的, 而默认情况下,hook不会的OnSubscribe对象做任何的处理; 当然,默认不处理, 但是我们需要的话自然是可以个性化地重写这个方法的...这里可以看到如果传给subscribe()的对象是Observer, 则会在源码Rxjava的源码中首先被转换成Subscriber, 之后再进行后续操作; 也即应证了之前所说的Rxjava内部...()也就被调用执行; //第一步:创建被观察者:create Observable observable = Observable.create(new >Observable.OnSubscribe..., 就不会再执行onNext()(注意布尔变量done); (因完成状态/Completed或者异常状态/Error发生后, 就没必要在进行下一步/Next的操作了) (onComplete()

    1.8K30

    Rx Java 异步编程框架

    Flowable 是一个抽象类,但是由于要严格遵循大量的 Reactive Streams 规则,不建议通过直接扩展类来实现源和自定义操作符。...flatMap操作后的结果 flatMapObservable Observable 返回一个Observable,它发射对原Single的数据执行flatMap操作后的结果 from Single 将...操作符链添加多线程功能,你可以指定操作符(或者特定的Observable)在特定的调度器(Scheduler)上执行。...某些ReactiveX的Observable操作符有一些变体,它们可以接受一个Scheduler参数。这个参数指定操作符将它们的部分或全部任务放在一个特定的调度器上执行。...; FlatMap操作符使用一个指定的函数对原始 Observable 发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的 Observable,然后FlatMap合并这些 Observables

    3.8K20

    Rxjs源码解析(一)Observable

    () 方法创建了一个可观察对象 observable,然后通过 subscribe 方法订阅这个observable,订阅的时候会执行在 new Observable时候传入的函数参数,那么就来看下 new..._subscribe 就是在最开始 new Observable的时候传入的参数,所以只要有订阅操作(subscribe),就会执行这个方法protected _trySubscribe(sink: Subscriber...和 operator,这是为了方便链式操作,在当前版本中,官方已经不建议开发者直接调用这个方法了,主要是供给 rxjs内部众多的 operators 使用forEachforEach(next: (value...operator如果没有传入任何操作符方法,则直接返回 Observable 对象;如果只传入了一个操作符方法,则直接返回该操作符方法,否则返回一个函数,将在函数体里通过reduce方法依次执行所有的操作符...,执行的逻辑是将上一个操作符方法返回的值作为下一个操作符的参数,就像是一个管道串联起了所有的操作符,这里借鉴了函数式编程的思想,通过一个 pipe 函数将函数组合起来,上一个函数的输出成为下一个函数的输入参数最后

    2K50

    RxJS & React-Observables 硬核入门指南

    X(叉)表示由Observable发出的错误。 “completed”和“error”状态是最终状态。这意味着,observable在成功完成或遇到错误后不能发出任何数据。...: () => console.log('completed'); }); 执行 Observable 当Observable被订阅时,我们传递给新Observable构造函数的subscribe函数就会被执行...to the Observable observable.subscribe(subject); // Subscribe to the subject subject.subscribe({...在Epic内部,我们可以使用任何RxJS的可观察模式,这就是为什么redux-observable很有用。 例如:我们可以使用.filter操作符创建一个新的中间可观察对象。...但我不讨厌redux- tank,我喜欢它,我每天都在使用它! 练习1:调用API 用例:调用API来获取文章的注释。当API调用正在进行时显示加载器,并处理API错误。

    8K50

    RxSwift介绍(二)——Observable

    此处特意把error事件放在completed事件之后,打印结果证明,观察者在触发complete事件之后不会再响应任何事件。 ?...} .disposed(by: disposeBag) empty方法 该方法创建一个空内容的 Observable 序列,执行时直接执行completed方法....disposed(by: disposeBag) error方法 该方法创建一个不做任何操作,而是直接发送一个错误的 Observable 序列 //首先创建一个error类型的枚举 enum MyError....disposed(by: disposeBag) deferred方法 该方法相当于是创建一个 Observable 工厂,通过传入一个 block 来执行延迟 Observable序列创建的行为,...在创建Observable时,在订阅任何不同的观察者之后,代码一定会添加一行 .disposed(by: disposeBag) 代码,而 disposeBag 是之前全局创建生成的let disposeBag

    1.8K20

    RXJava原理_JavaScript的执行原理

    台灯(观察者)作为事件的处理方(处理的是“on”和“off”这两个事件),被动的执行on和off。 在产生和完成中间,即在事件由产生方传递到处理方的过程中需要被加 工,过滤和装换等操作。...Schedulers.immedate()表明直接运行在当前线程,不指定默认为该值; Schedulers.newThread()表明每次执行将开启新的线程; Schedulers.io() I/O 操作...()被激活的线程(事件产生),observableOn():指定Subscriber执行的线程,即事件消费的线程;光说不练假把式: Observable.just("1","2","3") .subscribeOn...(Schedulers.io())//指定subscribe()执行的线程为io线程 .observeOn(AndroidSchedulers.mainThread())//指定Subscriber回调执行线程为主线程...本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

    91220

    RxJS 入门到搬砖 之 Observable 和 Observer

    同样对于 Observable,如果你不“调用”它(使用 subscribe), console.log('Hello') 也不会被执行。...另外,“调用”和“订阅”是一个孤立的操作:两个函数调用触发两个单独的副作用,两个 Observable 订阅触发两个单独的副作用。...() 表示同步或异步地返回 0 或多个值 # Anatomy of an Observable Observable 使用 new Observable 或一个创建操作符来 created,会被 Observer...使用 observable.subscribe,给定的 Observer 不会在 Observable 中注册为监听器。Observable 甚至不维护一个 Observer 列表。...当 observable.subscribe 被调用时,Observer 被附加到新创建的 Observable 执行中,该调用还会返回 Subscription 对象。

    1.3K20

    RxJS mergeMap和switchMap

    接下来让我们来介绍一下高阶 observable 及如何利用它使得事情变得更简单。 高阶 Observables 一个 Observable 对象可以发出任何类型的值:数值、字符串、对象等等。...这里需要记住的是,observable 对象是 lazy 的,如果想要从一个 observable 对象中获取值,你必须执行订阅操作,比如: clicksToInterval$.subscribe(intervalObservable...() 操作符底层做的操作跟上面的例子一样,它获取 inner observable 对象,执行订阅操作,然后把值推给 observer (观察者)对象。...$ = click$.pipe( map(event => { return interval$; }), switchAll() ); observable$.subscribe...反之,使用 merge() 操作符,我们会有三个独立的 interval 对象。当源发出新值后,switch 操作符会对上一个内部的订阅对象执行取消订阅操作。

    2.5K41

    RxJS:给你如丝一般顺滑的编程体验(建议收藏)

    响应式编程 结合实际,如果你使用过Vue,必然能够第一时间想到,Vue的设计理念不也是一种响应式编程范式么,我们在编写代码的过程中,只需要关注数据的变化,不必手动去操作视图改变,这种Dom层的修改将随着相关数据的改变而自动改变并重新渲染...Observable 执行 // Observable 执行是通过使用观察者调用 subscribe 方法启动的 subscription.unsubscribe(); Subject (主体) 它是一个代理对象...最终代码的执行结果就是没有任何数据打印出来,分析一下原因其实也比较好理解,由于开启数据发送的时候还没有订阅,并且这是一个Hot Observables,它是不会理会你是否有没有订阅它,开启之后就会直接发送数据...这意味着,当您使用 queue 调度程序执行任务时,您确定它会在该调度程序调度的其他任何任务开始之前结束。 这个同步与我们平常理解的同步可能不太一样,笔者当时也都困惑了一会。...经过前面代码的洗礼,相信大家对该操作符已经不陌生了。

    8.4K99

    大佬们,一波RxJava 3.0来袭,请做好准备~

    背压(Backpressure) 当数据流通过异步的步骤执行时,这些步骤的执行速度可能不一致。也就是说上流数据发送太快,下流没有足够的能力去处理。...:发送0个N个的数据,不支持背压, io.reactivex.Single:只能发送单个数据或者一个错误 io.reactivex.Completable:没有发送任何数据,但只处理 onComplete...不建议再往下看了,建议点赞或收藏...下文关于操作符内容太多了等需要了,再来查阅下班时间还是好好护发吧 ?...与retry类似,但发生异常时,返回值是false表示继续执行(重复发射数据),true不再执行,但会调用onError方法。...同时不建议立马在项目上实践,给它点时间报bug。

    2.2K10

    源码阅读--RxJava(一)

    这里的事件可以是任何你感兴趣的东西(触摸事件,web接口调用返回的数据。。。) 一个Observable可以发出零个或者多个事件,知道结束或者出错。...Rxjava的看起来很想设计模式中的观察者模式,但是有一点明显不同,那就是如果一个Observerble没有任何的的Subscriber,那么这个Observable是不会发出任何事件的。...super T> subscriber) { return Observable.subscribe(subscriber, this); } 再看subscribe函数..., observable.onSubscribe).call(subscriber); //***************************相当于后面的参数.call,也就是执行构造函数中实例化的...所有的操作符都会调用lift这个函数,只是参数不同,这可以通过多态来实现 简而言之,这个库就是由观察者模式+多态来实现的。

    44020
    领券