首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

当我在FlowableOnSubscribe类中调用onNext时,订阅者的onNext和onComplete函数不会运行

当在FlowableOnSubscribe类中调用onNext时,订阅者的onNext和onComplete函数不会运行的原因可能是订阅者未正确订阅该Flowable对象或者存在错误的订阅关系。

Flowable是RxJava中的一种可观察序列,用于支持背压(backpressure)的场景。在使用Flowable时,需要通过subscribe方法订阅该序列,并传入相应的订阅者(Subscriber)对象。

可能的原因和解决方法如下:

  1. 订阅者未正确订阅Flowable对象:在调用Flowable的subscribe方法时,需要传入一个订阅者对象,确保订阅者与Flowable建立正确的订阅关系。例如:
代码语言:txt
复制
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onComplete();
    }
}, BackpressureStrategy.BUFFER);

flowable.subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
        System.out.println("onError: " + t.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
});
  1. 订阅者未正确处理背压(backpressure):Flowable支持背压机制,即订阅者可以通过request方法告知发布者(Flowable)自己能够处理的数据量。如果订阅者未正确处理背压,可能导致订阅者的onNext和onComplete函数不会运行。在订阅者的onSubscribe方法中,需要调用request方法请求数据。例如:
代码语言:txt
复制
flowable.subscribe(new Subscriber<Integer>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        subscription = s;
        subscription.request(1); // 请求处理1个数据
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("onNext: " + integer);
        subscription.request(1); // 请求处理下一个数据
    }

    @Override
    public void onError(Throwable t) {
        System.out.println("onError: " + t.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
});

请注意,以上代码示例中并未涉及腾讯云相关产品和产品介绍链接地址,如有需要,请参考腾讯云官方文档或咨询腾讯云官方支持。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

深入RxJava2 源码解析(一)

(发布)onNext方法, //那么数据订阅也就不会消费到数据 t.onSubscribe(emitter); try { //回调注册FlowableOnSubscribe... sourcesubscribe方法 //这个source其实就是创建Flow流注册数据产生,进一步验证了上文中 //提及其需要实现FlowableOnSubscribe...//当数据产生者(发布)频繁调用onNext方法,这里产生并发调用关系,wip变量是atomic变量, //当第一次执行drain函数,为0继续执行后面的流程,当快速继续调用onNext...= 0) { return; } int missed = 1; //这里downstream其实就是注册数据订阅,它是基BaseEmitter变量,前面初始化时调用了基构造函数...回归主题,当我们使用操作符线程池机制时候做法都是在数据发布后面进行相应函数操作: Disposable disposeable = scheduleObservable

1.2K20

Android RxJava:一文带你全面了解 背压策略

背压策略简介 2.1 定义 一种 控制事件流速 策略 2.2 作用 异步订阅关系 ,控制事件发送 & 接收速度 注:背压作用域 = 异步订阅关系,即 被观察 & 观察处在不同线程...由于第2节中提到,使用背压场景 = 异步订阅关系,所以下文中讲解主要是异步订阅关系场景,即 被观察 & 观察 工作不同线程 2....5.1.2 同步订阅情况 同步订阅 & 异步订阅 区别在于: - 同步订阅,被观察 & 观察工作于同1线程 - 同步订阅关系没有缓存区 ?...特别注意 同步订阅情况中使用FlowableEmitter.requested(),有以下几种使用特性需要注意: ?...而在异步订阅关系,反向控制原理是:通过RxJava内部固定调用被观察线程request(n) 从而 反向控制被观察发送事件速度 那么该什么时候调用被观察线程request(n) &

1.9K20
  • Carson带你学Android:图文详解RxJava背压策略

    背压策略简介 2.1 定义 一种 控制事件流速 策略 2.2 作用 异步订阅关系 ,控制事件发送 & 接收速度 注:背压作用域 = 异步订阅关系,即 被观察 & 观察处在不同线程 2.3...,所以下文中讲解主要是异步订阅关系场景,即 被观察 & 观察 工作不同线程 但由于同步订阅关系场景也可能出现流速不匹配问题,所以讲解异步情况后,会稍微讲解一下同步情况,以方便对比 5.1...同步订阅情况 同步订阅 & 异步订阅 区别在于: 同步订阅,被观察 & 观察工作于同1线程 同步订阅关系没有缓存区 被观察发送1个事件后,必须等待观察接收后,才能继续发下1个事件 /...为了方便大家理解该策略requested()使用,该节会先讲解同步订阅情况,再讲解异步订阅情况 5.2.1 同步订阅情况 原理说明 即在同步订阅情况,被观察 通过 FlowableEmitter.requested...,反向控制原理是:通过RxJava内部固定调用被观察线程request(n) 从而 反向控制被观察发送事件速度 那么该什么时候调用被观察线程request(n) & n 值该是多少呢

    1.2K10

    Rxjava2_Flowable_Sqlite_Android数据库访问实例

    一、使用Rxjava访问数据库优点: 1.随意线程控制,数据库操作一个线程,返回数据处理ui线程 2.随时订阅取消订阅,而不必再使用回调函数 3.对读取数据用rxjava进行过滤,流式处理...4.使用sqlbrite可以原生返回rxjava格式,同时是响应式数据库框架 (有数据添加更新自动调用之前订阅读取函数,达到有数据添加自动更新ui效果, 同时这个特性没有禁止方法,只能通过取消订阅停止这个功能...(Optional.fromNullable(item)); //import com.google.common.base.Optional;//安全检查,待会看调用代码,配合rxjava很好 e.onComplete...); } } 五、数据库调用使用方法 使用了lambda简化了表达式进一步简化代码: 简化方法:/app/build.gradle里面加入如下内容:(defaultConfig外面) compileOptions...(获得数据库实例): //全局定义实例获取,以后想要换数据库,只需在这个里切换即可 public class Injection { public static DbSource getDbSource

    67320

    RxJava Flowable Processor

    同一个线程生产一个就消费了,不会产生问题,异步线程,如果生产速度大于消费速度,就会产生 Backpressure 问题。...request 这个方法若不调用,下游 onNextOnComplete不会调用。...异步调用时,RxJava 中有个缓存池,用来缓存消费处理不了暂时缓存下来数据,缓存池默认大小为 128,即只能缓存 128 个事件。...无论 request() 传入数字比 128 大或小,缓存池中刚开始都会存入 128 个事件。如果本身并没有这么多事件需要发送,则不会存 128 个事件。...BehaviorProcessor 发射订阅之前一个数据订阅之后全部数据。如果订阅之前没有值,可以使用默认值。 PublishProcessor 从哪里订阅就从哪里发射数据。

    2.2K20

    Android RxJava 使用

    操作符 则是把发布数据进行处理,再给订阅 ---- 发布订阅之间传递事件总共有三种 onNext(): 发送事件数据 onCompleted(): 事件队列完结。...RxJava 规定,当不会再有新 onNext() 发出,需要触发 onCompleted() 方法作为标志。 onError(): 事件队列异常。...一个正确运行事件序列, onCompleted() onError() 有且只有一个,并且是事件序列最后一个。...需要注意是,onCompleted() onError() 二也是互斥,即在队列调用了其中一个,就不应该再调用另一个。...) 背压(backpressure):只有上下游运行在各自线程,且上游发射数据速度大于下游接收处理数据速度,才会产生背压问题。

    2.2K30

    Android Rxjava :最简单&全面背压讲解 (Flowable)

    Rxjava背压:被观察发送事件速度大于观察接收事件速度,观察内会创建一个无限制大少缓冲池存储未接收事件,因此当存储事件越来越多时就会导致OOM出现。...总结 :当被观察发送事件大于128,观察抛出异常并终止接收事件,但不会影响被观察继续发送事件。 4.2.2. BUFFER 把上面例子改为BUFFER类型,执行结果如下: ?...总结 :MISSING就是没有采取背压策略类型,效果跟Obserable一样。 设置MISSING类型,可以配合onBackPressure相关操作符使用,也可以到达上述其他类型处理效果。...4.4.2 request扩展使用 request还可进行扩展使用,当遇到接收事件想追加接收数量(如:通信数据通过几次接收,验证准确性应用场景),可以通过以下方式进行扩展: Flowable.create...4.5 requested requested 与 request不是同一函数,但它们都是属于FlowableEmitter方法,那么requested()是有什么作用呢,看看以下例子: Flowable.create

    1.6K20

    RxJava之背压策略

    接着来看drain方法: 上面我们知道onSubscribe调用request()设置当前AtomicLongvalue值。 void drain() { //......= r不成立,e == r判断,在从onNext过来时empty为false,所以直接跳出 for循环。...我们在上面的drain()调用a.onNext(o)最终是调用observeOn构建对象ObserveOnSubscriberonNext,即调用runAsync();。...然后当e == limit是,回去调用LatestAsyncEmitterrequest(e),而limit是构造函数初始化,值为缓存队列容量Flowable.bufferSize() 3/4...然后LatestAsyncEmitter一样,当下游缓存队列满了之后,即不再放下游发送事件,只是把上游事件保存在SpscLinkedArrayQueue,等待下游处理了容量3/4事件之后,上游发送容量

    77620

    Rxjava 2.x 源码系列 - 基础框架分析

    > onNext > onComplete,若中途出错了,那调用顺序可能是这样 onSubscribe > onNext > onError onSubscribe 方法,当我调用 Observable...onError 方法与 onComplete 方法可以说是互斥调用了其中一个方法就不会调用另外一个方法 ---- 源码解析 基本使用 讲解原理之前,我们先来看一下 Rxjava 一个基本使用。...subscribeActual 方法,而我们知道 Observable subscribeActual 是抽象方法,因此,我们只需要关注其实现 subscribeActual 方法。... onNext onComplete 方法,而不会调用 onError 方法) 若在调用 onNext 方法过程中出错,那调用顺序可能是这样 Observable subcrible > Observable...,包装了 observer,调用 emitter 相应方法 ,会进而调用 observer onNext onError 方法,而不会调用 onComplete 方法 ) ---- observable

    52220

    Rx Java 异步编程框架

    但是ReactiveX,很多指令可能是并行执行,之后他们执行结果才会被观察捕获,顺序是不确定。为达到这个目的,你定义一种获取变换数据机制,而不是调用一个方法。...可观察对象,Rx定义为更强大Iterable,观察模式是被观察对象,一旦数据产生或发生变化,会通过某种方式通知观察订阅; Observer 观察对象,监听 Observable...Schedulers.io():一组动态更改线程上运行 I/O 或阻塞操作。 Schedulers.single():以顺序 FIFO 方式单个线程上运行工作。...Reactive Streams 规范定义发布订阅之间交互相对严格,以至于由于某些时间要求和需要通过 Subscription.request (long) 准备无效请求数量而导致严重性能损失...根据上面的代码结果输出可以看到,当我调用 subscription.request(n) 方法时候,会等onSubscribe()后面的代码执行完成后,才会立刻执行到onNext方法。

    3K20

    RxJava2学习笔记(3)

    Thread.sleep(1000);    注意: onSubscribe 里有一行s.request(2),相当于消费订阅,告诉生产,只能处理2条记录。...然后跑起来,就真的只有2条输出了: onNext->0 onNext->1 值得一提是:剩下消息,虽然消费不再处理了,但是生产实际上还会继续发,大家可以emitter.onNext(i)这后面...之所以这么设计,大家可以思考一下,因为一个生产射出来东西,可能有多个消费消费,如果因为某1个消费说:哎呀,太多了,我消化不了,你赶紧停下!...值为0,下游就开始报错了,也就是说这时已经达到了消费处理极限。...,生产还是一直持续不停发送,但是并没有发射满200次,而是正好等于缓冲区大小128(关于128这个数字,可参考本文最后参考文章)。

    1.4K60

    Carson带你学Android:手把手带你源码分析RxJava

    Disposable.dispose()),则调用观察(Observer)同名方法 = onNext() // 观察onNext()内容 = 使用步骤2复写内容... * 定义:RxJava 内置一个实现了 Observer 抽象 * 作用:扩展Observer 接口 = 新增了2个方法 = * 1. onStart():还未响应事件前调用...= 步骤1创建被观察(Observable)创建ObservableCreate * 即 订阅,实际上是调用了步骤1创建被观察(Observable)创建ObservableCreate...源码总结 步骤1(创建被观察(Observable))、步骤2(创建观察(Observer)),仅仅只是定义了发送事件 & 响应事件行为; 只有步骤3(订阅),才开始发送事件 & 响应事件...总结 本文主要对 RxJava2 订阅流程进行了源码分析

    35610

    Android RxJava:一步步带你源码分析 RxJava

    订阅流程 使用 2.1 使用步骤 RxJava订阅流程 使用方式 = 基于事件流链式调用,具体步骤如下: 步骤1:创建被观察(Observable)& 定义需发送事件 步骤2:创建观察(...Disposable.dispose()),则调用观察(Observer)同名方法 = onNext() // 观察onNext()内容 = 使用步骤2复写内容... * 定义:RxJava 内置一个实现了 Observer 抽象 * 作用:扩展Observer 接口 = 新增了2个方法 = * 1. onStart():还未响应事件前调用...= 步骤1创建被观察(Observable)创建ObservableCreate * 即 订阅,实际上是调用了步骤1创建被观察(Observable)创建ObservableCreate...源码总结 步骤1(创建被观察(Observable))、步骤2(创建观察(Observer)),仅仅只是定义了发送事件 & 响应事件行为; 只有步骤3(订阅),才开始发送事件 & 响应事件

    58810

    再忆RxJava---背压策略

    未雨绸缪(事情还没有发生之前做一定处理),一共有两种 (1)控制被观察发送事件速度---反馈控制 (2)控制观察接收事件速度---响应式拉取 2.2 亡羊补牢(事情已经发生,如何补救)...,就走onError,没有就往下一个onNext走 (可以先看一下ObserveOnSubscriberonSubscribe函数,里面有queue构造,以及sourceMode其实并没有赋值)...queue.offer(t)) {//这个queue就是FlowableObserveOn构造函数prefetch大小一个队列。...主线程s.request来控制要取多少数据,不设置就永远没有onNext打印出来(有点类似于线程池) 3.2.1 控制被观察发送事件速度---反馈控制 由于观察被观察处于不同线程,所以被观察无法通过...=0,或者drop,直接不管 3.2.2 控制观察接收事件速度---响应式拉取 比如发送100,s.request(50),那么也就是说还会有50个缓存队列里面。

    66720

    kotlin--使用观察、装饰模式实现响应式风格架构

    封装被观察 上面代码创建订阅关系,直接创建了一个被观察匿名实现,我们可以继续对观察进行封装,不对外暴露被观察 1.定义发射器接口 内部使用发射器,来替换直接调用观察方法,发射器拥有观察相同一部分方法...目前上流被观察发送数据下流观察接收数据是相同类型实际开发,很可能会对该数据类型进行转换,我们不希望观察中进行转换,因为这样代码结构显得不优雅,而且其转换过程可能是异步,那么如何在上流就进行转换...数据转换接口实现 实例化一个观察,对原来被观察进行订阅,并在该观察方法中使用数据转换函数后,调用外部传入观察方法 /** * 转换后新被观察,就是将原来被观察装饰了下 */...目前发送数据接收数据处于同一个线程,如果想要使得上流发送数据子线程,只需要包装被观察订阅方法,订阅方法目前在下面的地方调用: 它们都继承至ObservableProxy,所以只需要在ObservableProxy...定义一个线程调度方法,并在这两个地方调用即可 1.上下流线程调度 由于Java没有Looper,所以如果不指定下流使用子线程,那么上下流将会在同一线程执行,抽象,定义两个变量,来表示上流下流是否使用线程

    56120
    领券