
响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。
在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在 JVM 上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(Flow 类)。
响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。
此外,对推送来的数据的操作是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过描述“控制流程”来定义对数据流的处理逻辑。
除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。一个 Publisher 可以推送新的值到它的 Subscriber(调用 onNext 方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。错误和完成信号都可以终止响应式流。可以用下边的表达式描述:
onNext x 0..N [onError | onComplete]这种方式非常灵活,无论是有/没有值,还是 n 个值(包括有无限个值的流,比如时钟的持续读秒),都可处理。
以上来自 https://projectreactor.io/docs/core/release/reference/ 翻译
Reactive Streams 是上面提到的一套标准的响应式编程规范。它由四个核心概念构成:
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}public interface Subscription {
// request(n)用来发起请求数据,其中n表示请求数据的数量,它必须大于0,
// 否则会抛出IllegalArgumentException,并触发onError,request的调用会
// 累加,如果没有终止,最后会触发相应次数的onNext方法.
public void request(long n);
// cancel相当于取消订阅,调用之后,后续不会再收到订阅,onError 和
// onComplete也不会被触发
public void cancel();
}public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}Reactive Streams 通过上面的四个核心概念和相关的函数,对响应式流进行了一个框架性的约定,它没有具体实现。简单来说,它只提供通用的、合适的解决方案,大家都按照这个规约来实现就好了。
Java 的 Reactive Programming 类库主要有三个,分别是 Akka-Streams ,RxJava 和 Project Reactor。Spring 5 开始支持 Reactive Programming,其底层使用的是 Project Reactor。本篇主要是对 Project Reactor 中的一些点进行学习总结。
Project Reactor 是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。
Reactor 引入了实现 Publisher 的响应式类 Flux 和 Mono,以及丰富的操作方式。一个 Flux 对象代表一个包含 0..N 个元素的响应式序列,而一个 Mono 对象代表一个包含零或者一个(0..1)元素的结果。
Flux 是生产者,即我们上面提到的 Publisher,它代表的是一个包含 0-N 个元素的异步序列,Mono可以看做 Flux 的有一个特例,代表 0-1 个元素,如果不需要生产任何元素,只是需要一个完成任务的信号,可以使用 Mono。

先来看这张图,这里是直接从官方文档上贴过来的。就这张图做下说明,先来关注几个点:
那整体来看就是 Flux 产生元数据,通过一系列 operator 操作得到转换结果,正常成功就是 onCompleted,出现错误就是 onError。看下面的一个小例子:
Flux.just("glmapper","leishu").subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription subscription) {
// subscription 表示订阅关系
System.out.println("onSubscribe,"+ subscription.getClass());
// subscription 通过 request 来触发 onNext
subscription.request(2);
}
@Override
public void onNext(String s) {
System.out.println("currrent value is = " + s);
}
@Override
public void onError(Throwable throwable) {
System.out.println("it's error.");
}
@Override
public void onComplete() {
System.out.println("it's completed.");
}
});执行结果:
onSubscribe,class reactor.core.publisher.StrictSubscriber
currrent value is = glmapper
currrent value is = leishu
it's completed.如果在 onSubscribe 方法中我们不执行 request,则不会有后续任何操作。关于 request 下面看。
Flux是一个能够发出 0 到 N 个元素的标准的 Publisher,它会被一个 "error" 或 "completion" 信号终止。因此,一个 Flux 的结果可能是一个 value、completion 或 error。就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的
onNext,onComplete和onError方法。

这张图也来自官方文档,和上面 Flux 的区别就是,Mono 最多只能 emitted 一个元素。
Mono.just("glmapper").subscribe(System.out::println);通过上面两段小的代码来看,最直观的感受是,Flux 相当于一个 List,Mono 相当于 Optional。其实在编程中所有的结果我们都可以用 List 来 表示,但是当只返回一个或者没有结果时,用 Optional 可能会更精确些。
Optional 相关概念可自行搜索 jdk Optional
另外,Mono 和 Flux 都提供了一些工厂方法,用于创建相关的实例,这里简单罗列一下:
// 可以指定序列中包含的全部元素。创建出来的 Flux
// 序列在发布这些元素之后会自动结束。
Flux.just("glmapper", "leishu");
// 从一个Iterable 对象中创建 Flux 对象,当然还可以是数组、Stream对象等
Flux.fromIterable(Arrays.asList("glmapper","leishu"));
// 创建一个只包含错误消息的序列。
Flux.error(new IllegalStateException());
// 创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间
// 隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
Flux.interval(Duration.ofMillis(100)).take(10);
// 创建一个不包含任何消息通知的序列。
Flux.never();
// 创建一个不包含任何元素,只发布结束消息的序列。
Flux.empty();
// 创建包含从 start 起始的 count 个数量的 Integer 对象的序列
Flux.range(int start, int count);
// Mono 同上
Mono.empty();
Mono.never();
Mono.just("glmapper");
Mono.error(new IllegalStateException());上面的这些静态方法适合于简单的序列生成,当序列的生成需要复杂的逻辑时,则应该使用 generate() 或 create() 方法。
Reactor 的核心调用过程大致可以分为图中的几个阶段

当需要处理 Flux 或 Mono 中的消息时,可以通过 subscribe 方法来添加相应的订阅逻辑。在调用 subscribe 方法时可以指定需要处理的消息类型。可以只处理其中包含的正常消息,也可以同时处理错误消息和完成消息。
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);结果:
1
2
java.lang.IllegalStateException正常的消息处理相对简单。当出现错误时,有多种不同的处理策略:
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.onErrorReturn(0)
.subscribe(System.out::println);结果:
1
2
0 Flux.just(1, 2)
.concatWith(Mono.error(new IllegalArgumentException()))
.onErrorResume(e -> {
if (e instanceof IllegalStateException) {
return Mono.just(0);
} else if (e instanceof IllegalArgumentException) {
return Mono.just(-1);
}
return Mono.empty();
}).subscribe(System.out::println);结果:
1
2
-1Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.retry(1)
.subscribe(System.out::println);结果:
1
2
1
2
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException
Caused by: java.lang.IllegalStateException
at com.glmapper.bridge.boot.reactor.SimpleTest.testFluxSub(SimpleTest.java:75)
at com.glmapper.bridge.boot.reactor.SimpleTest.main(SimpleTest.java:23)在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler,Scheduler 是一个拥有广泛实现类的抽象接口,Schedulers 类提供的静态方法用于达成如下的执行环境:
Schedulers.immediate().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// main-11Schedulers.single().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// single-1-11Schedulers.elastic().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// elastic-2-11Schedulers.parallel().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// parallel-1-11ExecutorService executorService = Executors.newSingleThreadExecutor();
Schedulers.fromExecutorService(executorService).schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// pool-4-thread-1-11Schedulers.newElastic("test-elastic").schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// test-elastic-4-11一些操作符默认会使用一个指定的调度器(通常也允许开发者调整为其他调度器)例如, 通过工厂方法 Flux.interval(Duration.ofMillis(100)) 生成的每 100ms 打点一次的 Flux, 默认情况下使用的是 Schedulers.parallel(),下边的代码演示了如何将其装换为 Schedulers.single()
Flux<String> intervalResult = Flux.interval(Duration.ofMillis(100),
Schedulers.newSingle("test"))
.map(i -> Thread.currentThread().getName() +"@"+i);
intervalResult.subscribe(System.out::println);结果:
test-1@0
test-1@1
test-1@2
test-1@3
test-1@4
// 省略Reactor 提供了两种在响应式链中调整调度器 Scheduler 的方法:publishOn 和 subscribeOn。它们都接受一个 Scheduler 作为参数,从而可以改变调度器。但是 publishOn 在链中出现的位置是有讲究的,而 subscribeOn 则无所谓。
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);结果:
[elastic-2] [single-1] parallel-1上面这段代码使用 create() 方法创建一个新的 Flux 对象,其中包含唯一的元素是当前线程的名称。
接着是两对 publishOn() 和 map()方法,其作用是先切换执行时的调度器,再把当前的线程名称作为前缀添加。
最后通过 subscribeOn()方法来改变流产生时的执行方式。
最内层的线程名字 parallel-1 来自产生流中元素时使用的 Schedulers.parallel()调度器,中间的线程名称 single-1 来自第一个 map 操作之前的 Schedulers.single() 调度器,最外层的线程名字 elastic-2 来自第二个 map 操作之前的 Schedulers.elastic()调度器。
先到这里,剩下的想到再补充...