前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rx Java 异步编程框架

Rx Java 异步编程框架

作者头像
架构探险之道
发布2023-03-04 10:57:20
3K0
发布2023-03-04 10:57:20
举报
文章被收录于专栏:架构探险之道

Rx Java 异步编程框架

  • 名词定义
  • 举个例子
  • 基本概念
    • Backpressure
    • Upstream, Downstream
    • Objects in motion
    • Assembly time
    • Subscription time
    • Runtime
  • 特性
    • Simple background computation
    • Schedulers
    • Concurrency within a flow
    • Parallel processing
    • Dependent sub-flows
    • Continuations
    • Type conversions
    • Operator naming conventions
    • R8 and ProGuard settings
  • 依赖
  • 核心类
    • Flowable
    • Observable
    • Single
    • Completable
    • Maybe
  • 调度器
  • 创建操作
    • create
    • defer
    • range
  • 变换操作
    • flatMap
    • take
    • filter
    • doOnNext
  • 错误处理
  • 总结
  • REFERENCES

在很多软件编程任务中,或多或少你都会期望你写的代码能按照编写的顺序,一次一个的顺序执行和完成。但是在ReactiveX中,很多指令可能是并行执行的,之后他们的执行结果才会被观察者捕获,顺序是不确定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。在这种机制下,存在一个可观察对象(Observable),观察者(Observer)订阅(Subscribe)它,当数据就绪时,之前定义的机制就会分发数据给一直处于等待状态的观察者哨兵。

这种方法的优点是,如果你有大量的任务要处理,它们互相之间没有依赖关系。你可以同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。

有很多术语可用于描述这种异步编程和设计模式,在在本文里我们使用这些术语:一个观察者订阅一个可观察对象 (An observer subscribes to an Observable)。通过调用观察者的方法,Observable发射数据或通知给它的观察者。

在其它的文档和场景里,有时我们也将Observer叫做SubscriberWatcherReactor。这个模型通常被称作Reactor模式

名词定义

这里给出一些名词的翻译

  • Reactive 直译为反应性的,有活性的,根据上下文一般翻译为反应式、响应式;
  • Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念;
  • Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者;
  • Observer 观察者对象,监听 Observable 发射的数据并做出响应,Subscriber 是它的一个特殊实现;
  • emit 直译为发射,发布,发出,含义是 Observable 在数据产生或变化时发送通知给 Observer,调用 Observer 对应的方法,文章里一律译为发射;
  • items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项;

举个例子

响应式编程

代码语言:javascript
复制
    /**
     * Rx 测试
     *
     * @param args
     */
    public static void main(String[] args) {
        // Demo1
        Flowable.just("Hello world!").subscribe(System.out::println);
        // Demo2 数据消费
        Observable.just("Hello World!").subscribe(System.out::println);
        // Demo3 数据中间处理
        Observable.just("Hello World!")
                .map(p -> p + " Master!")
                .subscribe(System.out::println);
        // Demo4 输出流数据转换
        Observable.just("Hello World!")
                .map(p -> p.hashCode())
                .subscribe(System.out::println);
        Observable.just("Hello World!")
                .map(p -> Base64Utils.encodeToString(p.getBytes()))
                .map(p -> new String(Base64Utils.decodeFromString(p)))
                .subscribe(System.out::println);
    }

输出结果:

代码语言:javascript
复制
Hello world!
Hello World!
Hello World! Master!
-969099747
Hello World!

基本概念

Backpressure

在管道运输中,气流或液流由于管道突然变细、急弯等原因导致由某处出现了下游向上游的逆向压力,这种情况称作「back pressure」。这是一个很直观的词:向后的、往回的压力。 在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。 在 RxJava 中反压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。

  • 反压现象的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
  • 反压(Backpressure)并不是一个像 flatMap 一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。
  • 生产速度大于消费速度,所以需要 Buffer;
  • 外部条件有限制,所以 Buffer 需要有上限;
  • Buffer 达到上限这个现象,有一个简化的等价词叫做 Backpressure;
  • Backpressure 的出现其实是一种危险边界,唯一的选择是丢弃新事件。

Backpressure 指的是在 Buffer 有上限的系统中,Buffer 溢出的现象;它的应对措施只有一个:丢弃新事件。

当数据流通过异步步骤运行时,每个步骤可以以不同的速度执行不同的操作。

为了避免压倒性的这些步骤,这些步骤通常表现为由于临时缓冲需要跳过/删除数据而增加的内存使用,所谓的反压被应用,这是一种流控制形式,其中的步骤可以表示它们准备处理多少项。

这允许限制数据流的内存使用,因为通常没有办法让步骤知道上游将向它发送多少条目。

在 RxJava 中,专用的 Flowable 类被指定用于支持反压,Observable 专用于非反压操作(短序列、 GUI 交互等)。其他类型,Single, MaybeCompletable不支持反压,也不应该,总是有空间暂时存储一个元素。

Upstream, Downstream

上游、下游: RxJava 中的数据流包括一个源、零个或多个中间步骤,然后是数据消费者或组合子步骤(其中该步骤负责通过某种方式使用数据流) :

代码语言:javascript
复制
source.operator1()
    .operator2()
    .operator3()
    .subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());

在这里,如果我们把自己想象成操作者2,向左看向源头,我们称之为上游。向右看向订阅者/使用者称为下游。

Objects in motion

运动中的物体: 在 RxJava 的文档中,emission、 emission、 item、 event、 signal、 data 和 message 被认为是同义词,表示沿着数据流传输的对象。

Assembly time

装配时间:

通过应用各种中间操作符来准备数据流的过程发生在所谓的组装时间:

代码语言:javascript
复制
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);

在这一点上,数据还没有流动,也没有发生副作用。

Subscription time

订阅时间:

这是对在内部建立处理步骤链的流调用 subscribe () 时的临时状态:

代码语言:javascript
复制
flow.subscribe(System.out::println)

这时会触发订阅副作用(请参见 doOnSubscribe)。在这种状态下,某些源会立即阻塞或开始发送项。

Runtime

运行时:

这是当流处于主动发出元素、错误或完成信号时的状态:

代码语言:javascript
复制
Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

实际上,这是在执行上面给定示例的主体时触发的。

特性

Simple background computation

简单的背景计算:

RxJava 的一个常见用例是在后台线程上运行一些计算、网络请求,并在 UI 线程上显示结果(或错误) :

代码语言:javascript
复制
import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

这种类型的链接方法称为 fluent API,类似于构建器模式。但是,RxJava 的反应类型是不可变的;每个方法调用都返回一个带有添加行为的新 Flowable。为了说明,这个例子可以重写如下:

代码语言:javascript
复制
Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

通常,您可以通过 subscribeOn 将计算或阻塞 IO 移动到其他线程。一旦数据准备就绪,您可以确保通过 observeOn 在前台或 GUI 线程上对它们进行处理。

Schedulers

调度器: RxJava 操作符不直接与线程或 ExecutorServices 一起工作,而是与所谓的Scheduler 一起工作,这些有用的类来自统一的 API. RxJava 3 并发抽象,其提供了几个标准的调度器。

  • Schedulers.computation():在后台固定数量的专用线程上运行计算密集型工作。大多数异步操作符都将此作为默认值Scheduler
  • Schedulers.io():在一组动态更改的线程上运行类 I/O 或阻塞操作。
  • Schedulers.single():以顺序和 FIFO 方式在单个线程上运行工作。
  • Schedulers.trampoline():在一个参与的线程中,以顺序和 FIFO 的方式运行工作,通常是为了测试目的。

此外,还有一个选项可以通过 Scheduler 将现有的 Executor (及其子类型,如 ExecutorService)封装到 Scheduler 中Schedulers.from(Executor)。例如,可以使用它来拥有一个更大但仍然固定的线程池(分别与 calculation() 和 io() 不同)。

代码语言:javascript
复制
Observable.just("java", "C++", "python")
        //.observeOn(Schedulers.io())
    .observeOn(Schedulers.from(Executors.newFixedThreadPool(5)))
    .subscribe(System.out::println);

Thread.sleep (2000) ;最后并非偶然。在 RxJava 中,默认的调度程序运行在守护线程上,这意味着一旦 Java 主线程退出,它们就全部停止,后台计算可能永远不会发生。在这个示例情况中,休眠一段时间可以让您在控制台上看到流的输出,并节省时间。

Concurrency within a flow

流中的并发性:

在 RxJava 中,流本质上是连续的,可以被分割成可以并发运行的处理阶段:

代码语言:javascript
复制
Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

此示例在计算用的调度器 Scheduler 上将数字从1平方到10,并在“主”线程(更准确地说,blockingSubscribe 的调用方线程)上消费结果。但是,lambda v-> v * v 对于这个流并不是并行运行;它在同一个计算线程上一个接一个地接收值1到10。

Parallel processing

并行处理:

并行处理数字1到10的过程要稍微复杂一些:

代码语言:javascript
复制
Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

实际上,RxJava 中的并行性意味着运行独立的流并将它们的结果合并回单个流。运算符 flatMap 首先将每个数字从1到10映射到它自己的 Flowable,然后运行它们并合并计算出的平方。

但是请注意,flatMap 并不保证任何顺序,内部流中的项可能最终交叉存取。还有一些替代操作符:

  • concatMap:它一次映射并运行一个内部流程。
  • concatMapEager:它“同时”运行所有内部流,但输出流将按照内部流创建的顺序进行。

另外,Flowable.parallel () 操作符和 ParallelFlowable 类型有助于实现相同的并行处理模式:

代码语言:javascript
复制
Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

依赖子流:

flatMap 是一个强大的操作符,在很多情况下都有帮助。例如,给定一个返回 Flowable 的服务,我们希望调用另一个服务,其值由第一个服务发出:

代码语言:javascript
复制
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

延续: 有时候,当一个项变得可用时,需要在它上面执行一些依赖的计算。这有时被称为延续,并且根据应该发生什么以及涉及到什么类型,可能需要使用各种操作符来完成。

Dependent

依赖:

最典型的场景是给定一个值,调用另一个服务,等待并继续其结果:

代码语言:javascript
复制
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

通常情况下,后面的序列也需要早期映射的值。这可以通过将外部 flatMap 移动到先前 flatMap 的内部来实现,例如:

代码语言:javascript
复制
service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

在这里,原始值将在内部 flatMap 中可用,由 lambda 变量捕获提供。

Non-dependent

非依赖性的:

在其他场景中,第一个源/数据流的结果是不相关的,人们希望继续使用准独立的另一个源。在这里,flatMap 也可以工作:

代码语言:javascript
复制
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

然而,继续在这种情况下保持 Observable 而不是可能更合适的 Single。(这是可以理解的,因为从 flatMapSingle 的角度来看,sourceObservable 是一个多值源,因此映射也可能导致多个值)。

不过通常有一种方法可以使用 Completable 作为中介,通过它的操作符 andThen 来继续做一些事情:

代码语言:javascript
复制
sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

sourceObservablesomeSingleSource 之间的唯一依赖性是前者应该正常完成,以便后者被使用。

Deferred-dependent

依赖性递延:

有时,在前一个序列和新序列之间存在一个隐式的数据依赖关系,由于某种原因,这个依赖关系没有通过“常规通道”。人们倾向于写下这样的延续:

代码语言:javascript
复制
AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

不幸的是,当数据流甚至还没有运行时,因为 Single.just(count.get()) 将在assembly time时计算,所以这个命令将打印 0。我们需要一些东西,推迟这个Single源的计算,直到主源完成运行时:

代码语言:javascript
复制
AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

// 或者

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversions

类型转换: 有时候,源或服务返回的类型与应该与其一起工作的流不同。例如,在上面的库存示例中,getDemandAsync 可以返回 Single<DemandRecord> 。如果代码示例保持不变,将导致编译时错误(然而,通常会出现关于缺少重载的误导性错误消息)。 在这种情况下,通常有两个选项来修复转换: 1) 转换为所需的类型; 2) 查找并使用支持不同类型的特定运算符的重载。

Converting to the desired type

转换为所需的类型 每个反应性基类都包含能够执行此类转换(包括协议转换)以匹配其他类型的操作符。下表显示了现有的转换选项:

Flowable

Observable

Single

Maybe

Completable

Flowable

toObservable

first, firstOrError, single, singleOrError, last, lastOrError1

firstElement, singleElement, lastElement

ignoreElements

Observable

toFlowable2

first, firstOrError, single, singleOrError, last, lastOrError1

firstElement, singleElement, lastElement

ignoreElements

Single

toFlowable3

toObservable

toMaybe

ignoreElement

Maybe

toFlowable3

toObservable

toSingle

ignoreElement

Completable

toFlowable

toObservable

toSingle

toMaybe

  • 将多值源转换为单值源时,应该决定将众多源值中的哪一个作为结果。
  • Observable 转化为 Flowable 需要额外的决定:如何处理 Observable 源的潜在无约束流?有几个策略可用(如缓冲,下降,保持最新)通过 BackpressureStrategy 参数或通过标准 Flowable 操作符,如 onBackpressureBufferonBackpressureDroponBackpressureLatest,这也允许进一步定制的反压行为。
  • 当只有(最多)一个来源项目,这是一个没有问题的反压,因为它可以始终存储,直到下游准备消费。
Using an overload with the desired type

使用所需类型的重载: 许多经常使用的操作符具有可以处理其他类型的重载。它们通常以目标类型的后缀命名:

Operator

Overloads

flatMap

flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable

concatMap

concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable

switchMap

switchMapSingle, switchMapMaybe, switchMapCompletable

这些运算符使用后缀而不是简单地使用相同的名称和不同的签名的原因是类型消除。Java 并不认为操作符 operator(Function<T, Single<R>>) 和操作符 operator(Function<T, Maybe<R>>) 是不同的(不像 c #) ,而且由于泛型信息的擦除,这两个操作符最终会成为具有相同签名的重复方法。

Operator naming conventions

运算符命名约定: 在编程中,命名是最困难的事情之一,因为名字被认为不应该太长,表达力强,捕捉力强,容易记住。不幸的是,目标语言(以及预先存在的约定)在这方面可能不会提供太多帮助(不可用的关键字、类型擦除、类型歧义等)。

Unusable keywords

无法使用的关键字 在原始的 Rx.NET 中,发出一个条目然后完成的操作符叫做 Return (t)。由于 Java 约定是以小写字母开头的方法名称,所以这将是 return (t) ,它是 Java 中的一个关键字,因此不可用。因此,RxJava 选择将这个操作符命名为 just(T)。运算符 Switch 也存在同样的限制,它必须命名为 switchOnNext。另一个例子是 Catch,它命名为 onErrorResumeNext

Type erasure

类型擦除:

许多期望用户提供返回反应类型的函数的操作符不能重载,因为围绕 Function<T, X> 的类型擦除将这些方法签名转换为重复类型。选择通过添加类型作为后缀来命名这些操作符:

代码语言:javascript
复制
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
Type ambiguities

类型歧义:

尽管某些运算符在类型擦除方面没有问题,但它们的签名可能会变得模棱两可,特别是如果使用 java8 和 lambdas。例如,concatWith 使用各种其他反应性基类型作为参数(为了在底层实现中提供方便性和性能优势) ,存在几个重载:

代码语言:javascript
复制
Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

Publisher 和 SingleSource 都显示为函数接口(带有一个抽象方法的类型) ,并可能鼓励用户尝试提供一个 lambda 表达式:

代码语言:javascript
复制
someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

不幸的是,这种方法不起作用,而且示例根本不会打印2。事实上,自2.1.10版本以来,它甚至不能编译,因为至少存在4个 concatWith 重载,并且编译器会发现上面的代码不明确。

在这种情况下,用户可能希望推迟一些计算,直到 someSource 完成,因此正确的无歧义运算符应该推迟:

代码语言:javascript
复制
someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

有时候,添加后缀是为了避免逻辑上的歧义,这样可能会编译,但在流程中产生错误的类型:

代码语言:javascript
复制
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

当函数接口类型作为类型参数 T 参与时,这也可能会变得模糊不清。

Error handling

错误处理:

数据流可能会失败,此时错误会发送到消费者。不过有时候,多个源可能会失败,在这个时候可以选择是否等待所有源完成或失败。为了表示这个机会,许多操作符名称都以 DelayError 字符作为后缀(而其他操作符的重载中包含 DelayErrordelayErrors 布尔标志) :

代码语言:javascript
复制
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

当然,各种后缀可以一起出现:

代码语言:javascript
复制
Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);
Base class vs base type

基类与基类型: 由于基类上的静态方法和实例方法的数量很多,可以认为基类很多。RxJava 3 的设计受到 Reactive Streams 规范的严重影响,因此,该库为每个 Reactive 类型提供了一个类和一个接口:

Type

Class

Interface

Consumer

0..N backpressured

Flowable

Publisher1

Subscriber

0..N unbounded

Observable

ObservableSource2

Observer

1 element or error

Single

SingleSource

SingleObserver

0..1 element or error

Maybe

MaybeSource

MaybeObserver

0 element or error

Completable

CompletableSource

CompletableObserver

  • org.reactivestreams.Publisher 是外部 Reactive Streams 库的一部分。它是通过由 Reactive Streams 规范管理的标准化机制与其他响应式编程库交互的主要类型。
  • 接口的变数命名原则是在半传统的类名后面附加 Source。由于 Publisher 是由 Reactive Streams 库提供的,因此不存在 FlowableSource (子类型化也不会有助于互操作)。但是,这些接口并不是 Reactive Streams 规范意义上的标准接口,目前只是 RxJava 特定的接口。

R8 and ProGuard settings

R8 和 ProGuard 设置

默认情况下,RxJava 本身不需要任何 ProGuard/R8 设置,应该可以毫无问题地工作。不幸的是,Reactive Streams 自1.0.3 版本以来的依赖性已经在其 JAR 中嵌入了 Java 9 类文件,这些文件可能会使用普通的 ProGuard 导致警告:

代码语言:javascript
复制
Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher

建议在应用程序的 proguard-ruleset 文件中设置以下-dontwarn 条目:

-dontwarn java.util.concurrent.Flow*

对于 R8,RxJava jar 包含 META-INF/proguard/rxjava3.pro 和相同的 no-warning 子句,这些应该是自动应用的。

依赖

maven

代码语言:javascript
复制
<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>x.y.z</version>
</dependency>

Gradle

代码语言:javascript
复制
implementation 'io.reactivex.rxjava3:rxjava:x.y.z'

官方 API Docs (此处以3.0.8版本为例)

http://reactivex.io/RxJava/3.x/javadoc/3.0.8/

核心类

Flowable

io.reactivex.rxjava3.core.Flowable

  • 流数目:0~N(发送0~N个的数据)
  • 支持反应流和反压
代码语言:javascript
复制
public abstract class Flowable<@NonNull T> implements Publisher<T>

这个 类来实现 Reactive Streams 与 Flowable 扩展的 Publishers 一起运作。因此,许多中间的 操作符直接接受通用 Publishers,并允许与其他 Reactive Streams 实现直接互操作。

Flowable 为操作符提供 128 个元素的默认缓冲区大小,可以通过 bufferSize () 访问,可以通过系统参数 rx3.buffer-size 全局重写。但是,大多数操作符都有允许显式设置其内部缓冲区大小的重载。

代码语言:javascript
复制
  /**
     * Flowable 基类测试
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        Disposable d = Flowable.just("Hello world!")
                .delay(1, TimeUnit.SECONDS)
                .subscribeWith(new DisposableSubscriber<String>() {
                    @Override
                    public void onStart() {
                        System.out.println("Start!");
                        request(1);
                    }
                    @Override
                    public void onNext(String t) {
                        System.out.println(t);
                        request(1);
                    }
                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("Done!");
                    }
                });
        Thread.sleep(5000);
        // the sequence can now be cancelled via dispose()
        d.dispose();
    }

输出:

代码语言:javascript
复制
> Task :rx-java-examples:rx-java-chapter-1:FlowableTest.main()
Start!
Hello world!
Done!
  • Reactive Streams 规范在定义发布者和订阅者之间的交互时相对严格,以至于由于某些时间要求和需要通过 Subscription.request (long) 准备无效的请求数量而导致严重的性能损失。
  • 因此,RxJava 引入了 FlowableSubscriber 接口,它表明消费者可以使用放松的规则来驱动。所有的 RxJava 操作符都是根据这些宽松的规则实现的。如果订阅的 Subscriber 没有实现此接口,例如,由于它来自另一个 Reactive Streams 兼容库,Flowable 将自动在其周围应用一个兼容包装。
  • Flowable 是一个抽象类,但是由于要严格遵循大量的 Reactive Streams 规则,不建议通过直接扩展类来实现源和自定义操作符。如果需要这样的自定义实现,请参阅 wiki 获得一些指导。
  • 建议使用 create (FlowableOnSubscribe,backpresssurestrategy) factory 方法来创建自定义 Flowables:
代码语言:javascript
复制
    /**
     * 工厂方法创建 Flowable
     * @param args
     */
    public static void main(String[] args) {
        Flowable<String> source = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                // signal an item
                emitter.onNext("Hello");

                // could be some blocking operation
                Thread.sleep(1000);

                // the consumer might have cancelled the flow
                if (emitter.isCancelled()) {
                    return;
                }

                emitter.onNext("World");

                Thread.sleep(1000);

                // the end-of-sequence has to be signaled, otherwise the
                // consumers may never finish
                emitter.onComplete();
            }        
        }, BackpressureStrategy.BUFFER); // 指定反压策略

        System.out.println("Subscribe!");

        source.subscribe(System.out::println);

        System.out.println("Done!");
    }

输出:

代码语言:javascript
复制
> Task :rx-java-examples:rx-java-chapter-1:FlowableTest2.main()
Subscribe!
Hello
World
Done!

作为 RxJava 响应源,例如 Flowable,通常本质上是同步的和有序的。在 ReactiveX 设计中,操作符运行的位置(线程)与操作符可以处理数据的位置正交。这意味着异步和并行性必须通过 subscribeOn (Scheduler)observeOn (Scheduler)parallel ()等操作符显式表示。通常,带有 Scheduler 参数的操作符将这种类型的异步引入到流中。

Flowable虽然可以通过create()来创建,但是你必须指定反压的策略,以保证你创建的Flowable是支持反压的。

一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用 request(n) 来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。

代码语言:javascript
复制
Flowable.range(0, 10)
                .subscribe(new Subscriber<Integer>() {

                    Subscription sub;
     // 当订阅后,会首先调用这个方法,其实就相当于onStart(),
              // 传入的 Subscription s 参数可以用于请求数据或者取消订阅
                    @Override
                    public void onSubscribe(Subscription s) {
                        System.out.println("onSubscribe START");
                        sub = s;
                        s.request(1);
                        try {
                            TimeUnit.SECONDS.sleep(3);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("onSubscribe END");
                    }

                    @Override
                    public void onNext(Integer v) {
                        System.out.println("onNext: " + v);
                        sub.request(1);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("Done!");
                    }
                });

        Thread.sleep(5000);
        // the sequence can now be cancelled via dispose()
    }
// 输出
onSubscribe START
onSubscribe END
onNext: 0
onNext: 1
...
onNext: 8
onNext: 9
Done!

根据上面的代码的结果输出中可以看到,当我们调用 subscription.request(n) 方法的时候,会等onSubscribe()中后面的代码执行完成后,才会立刻执行到onNext方法。

尽可能确保在request()之前已经完成了所有的初始化工作,否则就有空指针的风险。

Observable

在RxJava中,一个实现了Observer接口的对象可以订阅(subscribe)一个Observable 类的实例。订阅者(subscriber)对Observable发射(emit)的任何数据或数据序列作出响应。这种模式简化了并发操作,因为它不需要阻塞等待Observable发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应Observable的通知。 io.reactivex.rxjava3.core.Observable

  • 流数目:0~N (发送0~N个的数据)
  • 无反压

在这里插入图片描述

ReactiveX真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理Observable发射的数据。 Rx的操作符让你可以用声明式的风格组合异步操作序列,它拥有回调的所有效率优势,同时又避免了典型的异步系统中嵌套回调的缺点。

just

将一个或多个对象转换成发射这个或这些对象的一个 Observable

代码语言:javascript
复制
 public static void main(String[] args) {
        System.out.println("DEMO 1 ---------------");
        Observable.just(1, 2, 3)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        d.dispose();
                        System.out.println("Cancel !");
                    }
                    @Override
                    public void onNext(Integer item) {
                        System.out.println("Next: " + item);
                    }
                    @Override
                    public void onError(Throwable error) {
                        System.err.println("Error: " + error.getMessage());
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("Sequence complete.");
                    }
                });
        System.out.println("DEMO 2 ---------------");
        Observable.just(1, 2, 3)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        System.out.println("onSubscribe ! + " + d.isDisposed());
                    }
                    @Override
                    public void onNext(Integer item) {
                        System.out.println("Next: " + item);
                    }
                    @Override
                    public void onError(Throwable error) {
                        System.err.println("Error: " + error.getMessage());
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("Sequence complete.");
                    }
                });
    }
// 输出
DEMO 1 ---------------
Cancel !
DEMO 2 ---------------
onSubscribe ! + false
Next: 1
Next: 2
Next: 3
Sequence complete
fromArray

将一个数组转换成一个Observable 相似的还有 fromIterable、fromFuture 可以将一个Iterable, 一个Future转换为一个Observable

代码语言:javascript
复制
  // 数组
  System.out.println("DEMO 3 ---------------");
        Integer[] items = {0, 1, 2, 3, 4, 5};
        Observable myObservable = Observable.fromArray(items);
        myObservable.subscribe(next -> System.out.println(next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
  // 异常处理
        System.out.println("DEMO 4 ---------------");
        Integer[] items2 = {0, 1, null, 3, 4, 5};
        myObservable = Observable.fromArray(items2);
        myObservable.subscribe(next -> System.out.println(next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
  // 迭代器
  System.out.println("DEMO 5 ---------------");
        Integer[] items3 = {0, 1, 2, 3, 4, 5};
        myObservable = Observable.fromIterable(Arrays.asList(items3));
        myObservable.subscribe(next -> System.out.println(next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
  // Future
  System.out.println("DEMO 6 ---------------");
        FutureTask<Long> task = new FutureTask<>(() -> System.currentTimeMillis());
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(task);
        try {
            System.out.println("get:" + task.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
        //executorService.shutdown(); 移至最后执行
        myObservable = Observable.fromFuture(task);
        myObservable.subscribe(next -> System.out.println("RECIEVE: " + next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
  // Future 超时
  System.out.println("DEMO 7 ---------------");
        FutureTask<Long> task2 = new FutureTask<>(() -> {
            TimeUnit.SECONDS.sleep(3);
            return System.currentTimeMillis();
        });
        myObservable = Observable.fromFuture(task2, 1, TimeUnit.SECONDS);//等待1s
        executorService.submit(task2);
        myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 1s: " + next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
        System.out.println("DEMO 8 ---------------");
        task2 = new FutureTask<>(() -> {
            TimeUnit.SECONDS.sleep(3);
            return System.currentTimeMillis();
        });
        myObservable = Observable.fromFuture(task2, 4, TimeUnit.SECONDS);//等待4s
        executorService.submit(task2);
        myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 4s: " + next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
        System.out.println("DEMO 9 ---------------");
        task2 = new FutureTask<>(() -> {
            TimeUnit.SECONDS.sleep(3);
            return System.currentTimeMillis();
        });
        myObservable = Observable.fromFuture(task2, 1, TimeUnit.SECONDS);//等待1s
        executorService.submit(task2);
        myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 1s: " + next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
        System.out.println("DEMO 10 ---------------");
        task2 = new FutureTask<>(() -> {
            TimeUnit.SECONDS.sleep(3);
            return System.currentTimeMillis();
        });
        myObservable = Observable.fromFuture(task2, 4, TimeUnit.SECONDS);//等待4s
        executorService.submit(task2);
        myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 4s: " + next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
        executorService.shutdown();
// 输出
DEMO 3 ---------------
0
1
2
3
4
5
DONE!
DEMO 4 ---------------
0
1
java.lang.NullPointerException: The element at index 2 is null
DEMO 5 ---------------
0
1
2
3
4
5
DONE!
DEMO 6 ---------------
get:1608780273323
RECIEVE: 1608780273323
DONE!
DEMO 7 ---------------
java.util.concurrent.TimeoutException
DEMO 8 ---------------
RECIEVE WITHIN 4s: 1608781164556
DONE!
DEMO 9 ---------------
java.util.concurrent.TimeoutException
DEMO 10 ---------------
RECIEVE WITHIN 4s: 1608781168578
DONE!
repeat

重复发送数据

代码语言:javascript
复制
 System.out.println("DEMO 1 ---------------");
        Integer[] items3 = {0, 1, 2};
        Observable.fromIterable(Arrays.asList(items3))
                //.repeat() // 一直重复发送
                .repeat(2) // 重复2次
                .subscribe(next -> System.out.println(next),
                        error -> System.out.println(error),
                        () -> System.out.println("DONE!")
                );
        System.out.println("DEMO 2 ---------------");
        Observable<Integer> observable = Observable.create(sub -> {
            System.out.println("START DEMO 2");
            sub.onNext(0);
            sub.onNext(1);
            sub.onComplete();
        });
        observable
                //.repeat() // 一直重复发送
                .map(p -> p * 2)
                .repeat(2) // 重复2次
                .map(p -> "modified: " + p)
                .repeat(2) // 重复2次
                .subscribe(next -> System.out.println(next),
                        error -> System.out.println(error),
                        () -> System.out.println("DONE!")
                );

// 输出
DEMO 1 ---------------
0
1
2
0
1
2
DONE!
DEMO 2 ---------------
START DEMO 2
modified: 0
modified: 2
START DEMO 2
modified: 0
modified: 2
START DEMO 2
modified: 0
modified: 2
START DEMO 2
modified: 0
modified: 2
DONE!
  • repeat 操作符在 Observable 源序列完成时重新订阅 Observable 源(参见 DEMO2)。
  • repeat 操作符重复整个序列重新订阅观察,而不是重复上一个映射操作符,并且在序列重复操作符中使用的位置无关紧要(参见 DEMO2)。
  • repeat 多次执行,最终重复数目等于其重复次数的积
代码语言:javascript
复制
 .repeat(2) // 重复2次
 .repeat(2) // 重复2次
// 相当于:
 .repeat(2*2=4) // 重复4次

Single

RxJava(以及它派生出来的RxGroovy和RxScala)中有一个名为 Single 的Observable变种。 Single类似于Observable,不同的是,它总是只发射一个值,或者一个错误通知,而不是发射一系列的值。 因此,不同于Observable需要三个方法onNext, onError, onCompleted,订阅Single只需要两个方法:

  • onSuccess:Single发射单个的值到这个方法
  • onError:如果无法发射需要的值,Single发射一个Throwable对象到这个方法

Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。 io.reactivex.rxjava3.core.Single

  • 流数目:1 (只能发送单个数据或者一个错误)
  • 元素(或错误)

Single也可以组合使用多种操作,一些操作符让你可以混合使用Observable和Single:

操作符

返回值

说明

compose

Single

创建一个自定义的操作符

concat and concatWith

Observable

连接多个Single和Observable发射的数据

create

Single

调用观察者的create方法创建一个Single

error

Single

返回一个立即给订阅者发射错误通知的Single

flatMap

Single

返回一个Single,它发射对原Single的数据执行flatMap操作后的结果

flatMapObservable

Observable

返回一个Observable,它发射对原Single的数据执行flatMap操作后的结果

from

Single

将Future转换成Single

just

Single

返回一个发射一个指定值的Single

map

Single

返回一个Single,它发射对原Single的数据执行map操作后的结果

merge

Single

将一个Single(它发射的数据是另一个Single,假设为B)转换成另一个Single(它发射来自另一个Single(B)的数据)

merge and mergeWith

Observable

合并发射来自多个Single的数据

observeOn

Single

指示Single在指定的调度程序上调用订阅者的方法

onErrorReturn

Single

将一个发射错误通知的Single转换成一个发射指定数据项的Single

subscribeOn

Single

指示Single在指定的调度程序上执行操作

timeout

Single

它给原有的Single添加超时控制,如果超时了就发射一个错误通知

toSingle

Single

将一个发射单个值的Observable转换为一个Single

zip and zipWith

Single

将多个Single转换为一个,后者发射的数据是对前者应用一个函数后的结果

代码语言:javascript
复制
System.out.println("DEMO 1 ----");
        //被观察者
        Single<String> single = Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(SingleEmitter<String> e) throws Exception {
                e.onSuccess("test");
                e.onSuccess("test2");//错误写法,重复调用也不会处理,因为只会调用一次
            }
        });
        //订阅观察者SingleObserver
        single.subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                //d.dispose();// 直接中断
                System.out.println("onSubscribe " + d.isDisposed());
            }

            @Override
            public void onSuccess(String s) {
                //相当于onNext和onComplete
                System.out.println("RECIEVE: " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }
        });
// 输出
DEMO 1 ----
onSubscribe false
RECIEVE: test

Completable

io.reactivex.rxjava3.core.Completable

  • 流数目:0(没有发送任何数据,但只处理 onComplete 和 onError 事件)
  • 完成(或错误)的信号

如果你的观察者连onNext事件都不关心,可以使用Completable,它只有onComplete和onError两个事件

代码语言:javascript
复制
System.out.println("DEMO 1 -------------");
        Completable.create(sub -> {
            sub.onComplete(); //单一onComplete或者onError
        }).subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("onSubscribe!");
            }

            @Override
            public void onComplete() {
                System.out.println("completed!");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }
        });
// 输出
DEMO 1 -------------
onSubscribe!
completed!

Maybe

io.reactivex.rxjava3.core.Maybe

  • 流数目:1 (能够发射0或者1个数据,要么成功,要么失败)
  • 元素(或错误)

如果你有一个需求是可能发送一个数据或者不会发送任何数据,这时候你就需要Maybe,它类似于Single和Completable的混合体。 Maybe可能会调用以下其中一种情况(也就是所谓的Maybe):

  • onSuccess或者onError
  • onComplete或者onError

可以看到onSuccess和onComplete是互斥的存在,例子代码如下:

代码语言:javascript
复制
System.out.println("DEMO 1 -------------");
Maybe.<Integer>create(sub -> {
            sub.onSuccess(1);
            //sub.onError(new IllegalAccessException("error!")); 异常直接失败
            sub.onComplete();
        }).subscribe(new MaybeObserver<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onSuccess(@NonNull Integer v) {
                // 发送一个数据时,相当于onNext和onComplete,但不会触发另一个方法onComplete
                System.out.println("onSuccess: " + v);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
System.out.println("DEMO 2 -------------");
        Maybe.<Integer>create(sub -> {
            //sub.onSuccess(1);
            //sub.onError(new IllegalAccessException("error!")); 异常直接失败
            sub.onComplete();
        }).subscribe(new MaybeObserver<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onSuccess(@NonNull Integer v) {
                System.out.println("onSuccess: " + v);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {
                 // 无数据发送时候的onComplete事件
                System.out.println("onComplete");
            }
        });
// 输出
DEMO 1 -------------
onSubscribe
onSuccess: 1
DEMO 2 -------------
onSubscribe
onComplete

使用举例: 下面是 Maybe/MaybeObserver 的普通用法,你可以看到,实际上,这种观察者模式并不用于发送大量数据,而是发送单个数据,也就是说,当你只想要某个事件的结果的时候,你可以用这种观察者模式。

代码语言:javascript
复制
   /**
     * Maybe
     *
     * @param args
     */
    public static void main(String[] args) {
        System.out.println("DEMO 3-------------");
        // 模拟数据统计
        Maybe.<Long>just(count())
                //可能涉及到IO操作,放在子线程
                .subscribeOn(Schedulers.newThread())
                //取回结果:当其它排队的任务完成后,在当前线程排队开始执行
                .observeOn(Schedulers.trampoline())
                .subscribe(new MaybeObserver<Long>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onSuccess(@NonNull Long v) {
                        System.out.println("onSuccess: " + v);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println(e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        // 无数据发送时候的onComplete事件
                        System.out.println("onComplete");
                    }
                });

        try {
            // 阻塞等待主线程结果
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 模拟数据库数据统计
     * @return
     */
    private static Long count() {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return System.currentTimeMillis();
    }
// 输出
DEMO 3-------------
onSubscribe
onSuccess: 1608794997569

调度器

Scheduler: 如果你想给Observable操作符链添加多线程功能,你可以指定操作符(或者特定的Observable)在特定的调度器(Scheduler)上执行。 某些ReactiveX的Observable操作符有一些变体,它们可以接受一个Scheduler参数。这个参数指定操作符将它们的部分或全部任务放在一个特定的调度器上执行。 使用ObserveOn和SubscribeOn操作符,你可以让Observable在一个特定的调度器上执行,ObserveOn指示一个Observable在一个特定的调度器上调用观察者的onNext, onError和onCompleted方法,SubscribeOn更进一步,它指示Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。

调度器类型

效果

Schedulers.computation( )

用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量

Schedulers.from(executor)

使用指定的Executor作为调度器

Schedulers.immediate( )

在当前线程立即开始执行任务

Schedulers.io( )

用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器

Schedulers.newThread( )

为每个任务创建一个新线程

Schedulers.trampoline( )

当其它排队的任务完成后,在当前线程排队开始执行

创建操作

create

你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable:恰当的调用观察者的onNext,onError和onComplete方法。 一个形式正确的有限Observable必须尝试调用观察者的onComplete正好一次或者它的onError正好一次,而且此后不能再调用观察者的任何其它方法。

代码语言:javascript
复制
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                if (!emitter.isDisposed()) {
                    int index = 5;
                    while (index-- > 0) {
                        emitter.onNext(index);
                        if (index == 3) {
                            emitter.onError(new IllegalAccessException("ILLEGAL NUMBER!"));
                            return;
                        }
                    }
                    emitter.onComplete();
                }
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(@NonNull Integer v) {
                System.out.println("onNext:" + v);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
// 输出
onSubscribe
onNext:4
onNext:3
ILLEGAL NUMBER!

defer

defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。 在某些情况下,等待直到最后一分钟(就是知道订阅发生时)才生成Observable可以确保Observable包含最新的数据。

defer

range

Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。 RxJava将这个操作符实现为range函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据(如果设置为负数,会抛异常)。

range

变换操作

flatMap

优化循环、嵌套:

  • FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable;
  • FlatMap操作符使用一个指定的函数对原始 Observable 发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的 Observable,然后FlatMap合并这些 Observables 发射的数据,最后将合并后的结果当做它自己的数据序列发射。

mergeMap

代码语言:javascript
复制
query("Hello, world!")
    .flatMap(new Func1<List<String>, Observable<String>>() {
        @Override
        public Observable<String> call(List<String> urls) {
            return Observable.from(urls);
        }
    })
    .subscribe(url -> System.out.println(url));
// lambda 形式更为简单
query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .subscribe(url -> System.out.println(url));
  • flatMap() 返回另一个Observable
  • 核心概念是新的Observable 返回的正是Subscriber所观察的。
  • 不接收List<String>,它接收Observable.fromIterable()返回的一系列的单独的Strings
  • flatMap()能返回任意想要的Observable
代码语言:javascript
复制
    /**
     * 处理一批数据
     *
     * @param args
     */
    public static void main(String[] args) {
        System.out.println("DEMO 1 ----");
        Observable.just(Lists.newArrayList("Hello", "World"))
                .subscribe(vals -> {
                    for (String val : vals) {
                        System.out.println(val);
                    }
                });
        System.out.println("DEMO 2 ----");
        Observable.just(Lists.newArrayList("Hello", "World"))
                .subscribe(vals -> {
                    Observable.fromIterable(vals)
                            .subscribe(val -> System.out.println(val));
                });
        System.out.println("DEMO 3 ----");
        Observable.just(Lists.newArrayList("Hello", "World"))
                .flatMap(vals -> Observable.fromIterable(vals))
                .subscribe(val -> System.out.println(val));
    }
// 输出
> Task :rx-java-examples:rx-java-chapter-1:FlatMapOperatorTest.main()
DEMO 1 ----
Hello
World
DEMO 2 ----
Hello
World
DEMO 3 ----
Hello
World
  • flatMap 处理数据
代码语言:javascript
复制
        System.out.println("DEMO 4 ----");
        Observable.just(Lists.newArrayList("Hello", "World"))
                .flatMap(vals -> Observable.fromIterable(vals))
                .flatMap(val -> Observable.just(val + ">>"))
                .subscribe(val -> System.out.println(val));
// 输出
DEMO 4 ----
Hello>>
World>>

map 和 flatMap 的区别

  • 返回结果不同:map 返回的是结果集,flatMap 返回的是包含结果集的 Observable;
  • 执行顺序不同:map 被订阅时每传递一个事件执行一次 onNext 方法,flatmap 多用于多对多,一对多,再被转化为多个时,一般利用 from/just 进行逐个分发,被订阅时将所有数据传递完毕汇总到一个Observable,然后逐个执行onNext方法,(如果单纯用于一对一转换则和 map 相同);
  • 转换对象的能力不同:
    • map 只能单一转换,单一只的是只能一对一进行转换,指一个对象可以转化为另一个对象但是不能转换成对象数组(map 返回结果集不能直接使用 from/just 再次进行事件分发,一旦转换成对象数组的话,再处理集合/数组的结果时需要利用 for 循环遍历取出,而使用 RxJava 就是为了剔除这样的嵌套结构,使得整体的逻辑性更强。)
    • flatMap 既可以单一转换也可以一对多/多对多转换,flatMap 要求返回 Observable,因此可以再内部进行 from/just 的再次事件分发,逐个取出单一对象;

take

最多发出指定数量的item(如果少于指定数目的标题,它会提前停止)

代码语言:javascript
复制
       System.out.println("DEMO 5 ----");
        Observable.just(Lists.newArrayList("Hello", "World"))
                .flatMap(vals -> Observable.fromIterable(vals))
                .flatMap(val -> Observable.just(val + ">>"))
                .take(1)
                .subscribe(val -> System.out.println(val));
// 输出
DEMO 5 ----
Hello>>

filter

过滤

代码语言:javascript
复制
       System.out.println("DEMO 6 ----");
        Observable.just(Lists.newArrayList("Hello", "World"))
                .flatMap(vals -> Observable.fromIterable(vals))
                .flatMap(val -> Observable.just(val + ">>"))
                .take(2)
                .filter(p->p.contains("ld"))
                .subscribe(val -> System.out.println(val));
// 输出
DEMO 6 ----
World>>

doOnNext

在每次一个item被发出之前,添加额外的行为。

代码语言:javascript
复制
System.out.println("DEMO 7 ----");
        Observable.just(Lists.newArrayList("Hello", "World"))
                .flatMap(vals -> Observable.fromIterable(vals))
                .flatMap(val -> Observable.just(val + ">>"))
                .take(2)
                .filter(p -> p.contains("ld"))
                .doOnNext(p -> System.out.println(">>" + p))
                .subscribe(val -> System.out.println(val));
// 输出
DEMO 7 ----
>>World>>
World>>

错误处理

通过回调函数监听停止发出 items 的操作以及原因: 根据Observable协议的定义,onNext可能会被调用零次或者很多次,最后会有一次onCompleted或onError调用(不会同时),传递数据给onNext通常被称作发射,onCompleted和onError被称作通知。

  • onNext(T item):Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。
  • onError(Exception ex):当 Observable 遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
  • onComplete:正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
代码语言:javascript
复制
    // io.reactivex.rxjava3.core.Observable @line 13052
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError) {
        return subscribe(onNext, onError, Functions.EMPTY_ACTION);
    }
代码语言:javascript
复制
 System.out.println("DEMO 8 ----");
        Observable.just(Lists.newArrayList("Hello", "World"))
                .flatMap(vals -> Observable.fromIterable(vals))
                .flatMap(val -> Observable.just(val + ">>"))
                .take(2)
                .filter(p -> p.contains("ld"))
                .doOnNext(p -> System.out.println(">>" + p))
                .flatMap(FlatMapOperatorTest::nullPointEx)
             // 抛出异常后停止执行
                .flatMap(val -> Observable.just(val + ">> +1"))
                .subscribe(next -> System.out.println("OK: "+ next),
                        error -> System.out.println("RECIEVE: " + error.getMessage()));
        System.out.println("DEMO 9 ----");
        Observable.just(Lists.newArrayList("Hello", "World"))
                .flatMap(vals -> Observable.fromIterable(vals))
                .flatMap(val -> Observable.just(val + ">>"))
                .take(2)
                .filter(p -> p.contains("ll"))
                .doOnNext(p -> System.out.println(">>" + p))
                .flatMap(FlatMapOperatorTest::nullPointEx)
             // 不抛异常的话继续执行
                .flatMap(val -> Observable.just(val + ">> +1"))
                .subscribe(next -> System.out.println("OK: "+ next),
                        error -> System.out.println("RECIEVE: " + error.getMessage()));
 System.out.println("DEMO 10 ----");
        Observable.just(Lists.newArrayList("Hello", "World"))
                .flatMap(vals -> Observable.fromIterable(vals))
                .flatMap(val -> Observable.just(val + ">>"))
                .take(2)
                .filter(p -> p.contains("ll"))
                .doOnNext(p -> System.out.println(">>" + p))
                .flatMap(FlatMapOperatorTest::nullPointEx)
                .flatMap(val -> Observable.just(val + ">> +1"))
                .subscribe(next -> System.out.println("OK: " + next),
                        error -> System.out.println("RECIEVE: " + error.getMessage()),
                        () -> {
                            System.out.println("DONE~!");
                        });

 /**
     * 根据条件抛出异常
     *
     * @param p
     * @return
     */
    private static ObservableSource<?> nullPointEx(String p) {
        if (p.contains("ld")) {
            return Observable.error(new NullPointerException("NULL EX"));
        }
        return Observable.just(p);
    }

// 输出
DEMO 8 ----
>>World>>
RECIEVE: NULL EX
DEMO 9 ----
>>Hello>>
OK: Hello>>>> +1
DEMO 10 ----
>>Hello>>
OK: Hello>>>> +1
DONE~!

总结

Rx Java 作为优秀的异步编程框架,是一个使用可观察数据流进行异步编程的编程接口,ReactiveX 结合了观察者模式、迭代器模式和函数式编程的精华。通过 Rx Java 的编程方式,我们可以解决循环嵌套的回调地狱,通过事件订阅的方式实现代码层次间的解耦。和 Java 自带的 Stream 相似的是,其丰富的操作符使我们对于数据流的操作更加简单。

REFERENCES

  • 关于 RxJava 最友好的文章——背压(Backpressure)
  • 如何形象的描述反应式编程中的背压(Backpressure)机制?
  • Rx Java 官方 API 3.x,Flowable
  • Rx Java Github
  • Rx Java 简述
  • 理解RxJava:(二)Operator,Operator
  • 理解RxJava:(三)RxJava的优点
  • 一文带你彻底了解java异步编程
  • Rx Java 中文文档
  • Rx Java Wiki
  • reactivex.io
  • Rxjava3文档级教程一:介绍和基本使用
  • RTP Tutorial with Rx Java
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构探险之道 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Rx Java 异步编程框架
    • 名词定义
      • 举个例子
        • 基本概念
          • Backpressure
          • Upstream, Downstream
          • Objects in motion
          • Assembly time
          • Subscription time
          • Runtime
        • 特性
          • Simple background computation
          • Schedulers
          • Concurrency within a flow
          • Parallel processing
          • Dependent sub-flows
          • Continuations
          • Type conversions
          • Operator naming conventions
          • R8 and ProGuard settings
        • 依赖
          • 核心类
            • Flowable
            • Observable
            • Single
            • Completable
            • Maybe
          • 调度器
            • 创建操作
              • create
              • defer
              • range
            • 变换操作
              • flatMap
              • take
              • filter
              • doOnNext
            • 错误处理
              • 总结
                • REFERENCES
                相关产品与服务
                数据库
                云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档