首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

当我在FlowableOnSubscribe类中调用onNext时,订阅者的onNext和onComplete函数不会运行

当在FlowableOnSubscribe类中调用onNext时,订阅者的onNext和onComplete函数不会运行的原因可能是订阅者未正确订阅该Flowable对象或者存在错误的订阅关系。

Flowable是RxJava中的一种可观察序列,用于支持背压(backpressure)的场景。在使用Flowable时,需要通过subscribe方法订阅该序列,并传入相应的订阅者(Subscriber)对象。

可能的原因和解决方法如下:

  1. 订阅者未正确订阅Flowable对象:在调用Flowable的subscribe方法时,需要传入一个订阅者对象,确保订阅者与Flowable建立正确的订阅关系。例如:
代码语言:txt
复制
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onComplete();
    }
}, BackpressureStrategy.BUFFER);

flowable.subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
        System.out.println("onError: " + t.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
});
  1. 订阅者未正确处理背压(backpressure):Flowable支持背压机制,即订阅者可以通过request方法告知发布者(Flowable)自己能够处理的数据量。如果订阅者未正确处理背压,可能导致订阅者的onNext和onComplete函数不会运行。在订阅者的onSubscribe方法中,需要调用request方法请求数据。例如:
代码语言:txt
复制
flowable.subscribe(new Subscriber<Integer>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        subscription = s;
        subscription.request(1); // 请求处理1个数据
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("onNext: " + integer);
        subscription.request(1); // 请求处理下一个数据
    }

    @Override
    public void onError(Throwable t) {
        System.out.println("onError: " + t.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
});

请注意,以上代码示例中并未涉及腾讯云相关产品和产品介绍链接地址,如有需要,请参考腾讯云官方文档或咨询腾讯云官方支持。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

深入RxJava2 源码解析(一)

(发布)者的onNext方法, //那么数据订阅者也就不会消费到数据 t.onSubscribe(emitter); try { //回调注册的FlowableOnSubscribe... source的subscribe方法 //这个source其实就是在创建Flow流时注册的数据产生类,进一步验证了上文中 //提及的其需要实现FlowableOnSubscribe...//当数据的产生者(发布)频繁调用onNext方法时,这里产生并发调用关系,wip变量是atomic变量, //当第一次执行drain函数时,为0继续执行后面的流程,当快速的继续调用onNext...= 0) { return; } int missed = 1; //这里的downstream其实就是注册的数据订阅者,它是基类BaseEmitter的变量,前面初始化时调用了基类的构造函数...回归主题,当我们使用操作符和线程池机制的时候做法都是在数据发布者后面进行相应的函数操作: Disposable disposeable = scheduleObservable

1.2K20

Android RxJava:一文带你全面了解 背压策略

背压策略简介 2.1 定义 一种 控制事件流速 的策略 2.2 作用 在 异步订阅关系 中,控制事件发送 & 接收的速度 注:背压的作用域 = 异步订阅关系,即 被观察者 & 观察者处在不同线程中...由于第2节中提到,使用背压的场景 = 异步订阅关系,所以下文中讲解的主要是异步订阅关系场景,即 被观察者 & 观察者 工作在不同线程中 2....5.1.2 同步订阅情况 同步订阅 & 异步订阅 的区别在于: - 同步订阅中,被观察者 & 观察者工作于同1线程 - 同步订阅关系中没有缓存区 ?...特别注意 在同步订阅情况中使用FlowableEmitter.requested()时,有以下几种使用特性需要注意的: ?...而在异步订阅关系中,反向控制的原理是:通过RxJava内部固定调用被观察者线程中的request(n) 从而 反向控制被观察者的发送事件速度 那么该什么时候调用被观察者线程中的request(n) &

2K20
  • Carson带你学Android:图文详解RxJava背压策略

    背压策略简介 2.1 定义 一种 控制事件流速 的策略 2.2 作用 在 异步订阅关系 中,控制事件发送 & 接收的速度 注:背压的作用域 = 异步订阅关系,即 被观察者 & 观察者处在不同线程中 2.3...,所以下文中讲解的主要是异步订阅关系场景,即 被观察者 & 观察者 工作在不同线程中 但由于在同步订阅关系的场景也可能出现流速不匹配的问题,所以在讲解异步情况后,会稍微讲解一下同步情况,以方便对比 5.1...同步订阅情况 同步订阅 & 异步订阅 的区别在于: 同步订阅中,被观察者 & 观察者工作于同1线程 同步订阅关系中没有缓存区 被观察者在发送1个事件后,必须等待观察者接收后,才能继续发下1个事件 /...为了方便大家理解该策略中的requested()使用,该节会先讲解同步订阅情况,再讲解异步订阅情况 5.2.1 同步订阅情况 原理说明 即在同步订阅情况中,被观察者 通过 FlowableEmitter.requested...,反向控制的原理是:通过RxJava内部固定调用被观察者线程中的request(n) 从而 反向控制被观察者的发送事件速度 那么该什么时候调用被观察者线程中的request(n) & n 的值该是多少呢

    1.2K10

    RxJava Flowable Processor

    同一个线程生产一个就消费了,不会产生问题,在异步线程中,如果生产者的速度大于消费者的速度,就会产生 Backpressure 问题。...request 这个方法若不调用,下游的 onNext 与 OnComplete 都不会调用。...在异步调用时,RxJava 中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为 128,即只能缓存 128 个事件。...无论 request() 中传入的数字比 128 大或小,缓存池中在刚开始都会存入 128 个事件。如果本身并没有这么多事件需要发送,则不会存 128 个事件。...BehaviorProcessor 发射订阅之前的一个数据和订阅之后的全部数据。如果订阅之前没有值,可以使用默认值。 PublishProcessor 从哪里订阅就从哪里发射数据。

    2.2K20

    Android 中 RxJava 的使用

    操作符 则是把发布者的数据进行处理,再给订阅者 ---- 在发布者和订阅者之间传递的事件总共有三种 onNext(): 发送事件的数据 onCompleted(): 事件队列完结。...RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。 onError(): 事件队列异常。...在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。...需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。...) 背压(backpressure):只有上下游运行在各自的线程中,且上游发射数据速度大于下游接收处理数据的速度时,才会产生背压问题。

    2.2K30

    Android Rxjava :最简单&全面背压讲解 (Flowable)

    Rxjava背压:被观察者发送事件的速度大于观察者接收事件的速度时,观察者内会创建一个无限制大少的缓冲池存储未接收的事件,因此当存储的事件越来越多时就会导致OOM的出现。...总结 :当被观察者发送事件大于128时,观察者抛出异常并终止接收事件,但不会影响被观察者继续发送事件。 4.2.2. BUFFER 把上面例子改为BUFFER类型,执行结果如下: ?...总结 :MISSING就是没有采取背压策略的类型,效果跟Obserable一样。 在设置MISSING类型时,可以配合onBackPressure相关操作符使用,也可以到达上述其他类型的处理效果。...4.4.2 request扩展使用 request还可进行扩展使用,当遇到在接收事件时想追加接收数量(如:通信数据通过几次接收,验证准确性的应用场景),可以通过以下方式进行扩展: Flowable.create...4.5 requested requested 与 request不是同一的函数,但它们都是属于FlowableEmitter类里的方法,那么requested()是有什么作用呢,看看以下例子: Flowable.create

    1.6K20

    RxJava之背压策略

    接着来看drain方法: 上面我们知道在onSubscribe中调用request()设置当前AtomicLong的value值。 void drain() { //......= r不成立,在e == r的判断中,在从onNext过来时empty为false,所以直接跳出 for循环。...我们在上面的drain()中调用a.onNext(o)最终是调用observeOn构建对象中的ObserveOnSubscriber的onNext,即调用runAsync();。...然后当e == limit是,回去调用LatestAsyncEmitter的request(e),而limit是在构造函数中初始化的,值为缓存队列容量Flowable.bufferSize()的 3/4...然后和LatestAsyncEmitter一样,当下游的缓存队列满了之后,即不再放下游发送事件,只是把上游的事件保存在SpscLinkedArrayQueue中,等待下游处理了容量的3/4的事件之后,上游在发送容量的

    80320

    Rx Java 异步编程框架

    但是在ReactiveX中,很多指令可能是并行执行的,之后他们的执行结果才会被观察者捕获,顺序是不确定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。...可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者; Observer 观察者对象,监听 Observable...Schedulers.io():在一组动态更改的线程上运行类 I/O 或阻塞操作。 Schedulers.single():以顺序和 FIFO 方式在单个线程上运行工作。...Reactive Streams 规范在定义发布者和订阅者之间的交互时相对严格,以至于由于某些时间要求和需要通过 Subscription.request (long) 准备无效的请求数量而导致严重的性能损失...根据上面的代码的结果输出中可以看到,当我们调用 subscription.request(n) 方法的时候,会等onSubscribe()中后面的代码执行完成后,才会立刻执行到onNext方法。

    3.1K20

    Rxjava 2.x 源码系列 - 基础框架分析

    > onNext > onComplete,若中途出错了,那调用顺序可能是这样的 onSubscribe > onNext > onError onSubscribe 方法,当我们调用 Observable...onError 方法与 onComplete 方法可以说是互斥的,调用了其中一个方法就不会调用另外一个方法 ---- 源码解析 基本使用 在讲解原理之前,我们先来看一下 Rxjava 的一个基本使用。...subscribeActual 方法,而我们知道在 Observable 类中 subscribeActual 是抽象方法,因此,我们只需要关注其实现类的 subscribeActual 方法。...的 onNext onComplete 方法,而不会调用 onError 方法) 若在调用 onNext 方法的过程中出错,那调用顺序可能是这样的 Observable subcrible > Observable...,包装了 observer,调用 emitter 的相应方法 ,会进而调用 observer 的 onNext onError 方法,而不会调用 onComplete 方法 ) ---- observable

    52620

    RxJava2学习笔记(3)

    Thread.sleep(1000);    注意: onSubscribe 里有一行s.request(2),相当于消费者在订阅时,告诉生产者,只能处理2条记录。...然后跑起来,就真的只有2条输出了: onNext->0 onNext->1 值得一提的是:剩下的消息,虽然消费者不再处理了,但是生产者实际上还会继续发的,大家可以在emitter.onNext(i)这后面...之所以这么设计,大家可以思考一下,因为一个生产者射出来的东西,可能有多个消费者在消费,如果因为某1个消费者说:哎呀,太多了,我消化不了,你赶紧停下!...值为0时,下游就开始报错了,也就是说这时已经达到了消费者的处理极限。...,生产者还是在一直持续不停的发送,但是并没有发射满200次,而是正好等于缓冲区大小128(关于128这个数字,可参考本文最后的参考文章)。

    1.4K60

    Carson带你学Android:手把手带你源码分析RxJava

    Disposable.dispose()),则调用观察者(Observer)的同名方法 = onNext() // 观察者的onNext()的内容 = 使用步骤2中复写内容...类 * 定义:RxJava 内置的一个实现了 Observer 的抽象类 * 作用:扩展Observer 接口 = 新增了2个方法 = * 1. onStart():在还未响应事件前调用...= 步骤1创建被观察者(Observable)时创建的ObservableCreate类 * 即 在订阅时,实际上是调用了步骤1创建被观察者(Observable)时创建的ObservableCreate...源码总结 在步骤1(创建被观察者(Observable))、步骤2(创建观察者(Observer))时,仅仅只是定义了发送的事件 & 响应事件的行为; 只有在步骤3(订阅时),才开始发送事件 & 响应事件...总结 本文主要对 RxJava2 中 的订阅流程进行了源码分析

    36310

    再忆RxJava---背压策略

    未雨绸缪(事情在还没有发生之前做一定的处理),一共有两种 (1)控制被观察者发送事件的速度---反馈控制 (2)控制观察者接收事件的速度---响应式拉取 2.2 亡羊补牢(事情已经发生,如何补救)...,就走onError,没有就往下一个onNext走 (可以先看一下ObserveOnSubscriber的onSubscribe函数,里面有queue的构造,以及sourceMode其实并没有赋值)...queue.offer(t)) {//这个queue就是FlowableObserveOn的构造函数中的prefetch大小的一个队列。...主线程s.request来控制要取多少数据,不设置就永远没有onNext打印出来(有点类似于线程池) 3.2.1 控制被观察者发送事件的速度---反馈控制 由于观察者和被观察者处于不同线程,所以被观察者无法通过...=0,或者drop,直接不管 3.2.2 控制观察者接收事件的速度---响应式拉取 比如发送100,s.request(50),那么也就是说还会有50个在缓存队列里面。

    68020

    Android RxJava:一步步带你源码分析 RxJava

    订阅流程 的使用 2.1 使用步骤 RxJava的订阅流程 使用方式 = 基于事件流的链式调用,具体步骤如下: 步骤1:创建被观察者(Observable)& 定义需发送的事件 步骤2:创建观察者(...Disposable.dispose()),则调用观察者(Observer)的同名方法 = onNext() // 观察者的onNext()的内容 = 使用步骤2中复写内容...类 * 定义:RxJava 内置的一个实现了 Observer 的抽象类 * 作用:扩展Observer 接口 = 新增了2个方法 = * 1. onStart():在还未响应事件前调用...= 步骤1创建被观察者(Observable)时创建的ObservableCreate类 * 即 在订阅时,实际上是调用了步骤1创建被观察者(Observable)时创建的ObservableCreate...源码总结 在步骤1(创建被观察者(Observable))、步骤2(创建观察者(Observer))时,仅仅只是定义了发送的事件 & 响应事件的行为; 只有在步骤3(订阅时),才开始发送事件 & 响应事件

    59210

    kotlin--使用观察者、装饰模式实现响应式风格架构

    封装被观察者 上面代码在创建订阅关系时,直接创建了一个被观察者类的匿名实现,我们可以继续对观察者进行封装,不对外暴露被观察者 1.定义发射器接口 内部使用发射器,来替换直接调用观察者的方法,发射器拥有和观察者相同的一部分方法...目前上流被观察者发送的数据和下流观察者接收的数据是相同类型的,在实际开发中,很可能会对该数据类型进行转换,我们不希望在观察者中进行转换,因为这样的代码结构显得不优雅,而且其转换过程可能是异步的,那么如何在上流就进行转换...数据转换接口实现类 实例化一个观察者,对原来的被观察者进行订阅,并在该观察者方法中使用数据转换函数后,调用外部传入的观察者的方法 /** * 转换后新的被观察者,就是将原来的被观察者装饰了下 */...目前发送数据和接收数据处于同一个线程中,如果想要使得上流发送数据在子线程,只需要包装被观察者的订阅方法,订阅方法目前在下面的地方调用: 它们都继承至ObservableProxy,所以只需要在ObservableProxy...中定义一个线程调度的方法,并在这两个地方调用即可 1.上下流线程调度 由于Java中没有Looper,所以如果不指定下流使用子线程,那么上下流将会在同一线程中执行,在抽象类中,定义两个变量,来表示上流和下流是否使用线程

    56520

    响应式编程调试,FLow的概念设计以及实现

    响应式编程的首要问题 - 不好调试 我们在分析传统代码的时候,在哪里打了断点,就能看到直观的调用堆栈,来搞清楚,谁调用了这个代码,之前对参数做了什么修改,等等。但是在响应式编程中,这个问题就很麻烦。...响应式编程 - Flow 的理解 之前说过 FLow 是 Java 9 中引入的响应式编程的抽象概念,对应的类就是:java.util.concurrent.Flow Flow 是一个概念类,其中定义了三个接口供实现...当Publisher判断不会有新的 item 或者异常发生的时候,就会调用onComplete告诉Subscriber消费完成了。大体上就是这么个流程。...) 在最后完成的时候,onComplete会被调用,如果说遇到了异常,那么onError会被调用,就不会调用onComplete了 这些方法其实都是Subscriber的方法,Subscriber是Flux...的订阅者,配置订阅者如何消费以及消费的具体操作。

    2.3K31
    领券