前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >从Reactor到WebFlux

从Reactor到WebFlux

作者头像
春哥大魔王
发布于 2019-11-06 14:50:20
发布于 2019-11-06 14:50:20
4.8K00
代码可运行
举报
运行总次数:0
代码可运行

写在前面

为了应对高并发场景下到服务端编程需求,微软最先提出了一种异步编程到方案Reactive Programming,也就是反应式编程。

之后在Java社区就出现了RxJava和Akka Stream等技术方案,让Java平台在反应式编程上有了多种选择。

反应式编程

函数式编程

反应式编程一般是基于函数式编程实现的,函数式编程有如下特点:

  1. 惰性计算
  2. 函数是第一公民
  3. 只使用表达式而不是用语句

反应式编程是一种基于数据流,传递变化,声明式的编程范式。

事件驱动

思想是组件之间交互通过松耦合的生产者和消费者来实现的,并且事件以异步,非阻塞方式进行发送和接收。

事件驱动是系统通过推模式实现的,也就是生产者在消息产生时推送数据给消费者进行处理,而不是让消费者不断轮询或等待数据实现的。

响应及时

由于反应式是异步的,比如进行数据处理的话,在交出任务之后就快速返回,而不是阻塞的等待任务执行完毕再返回。任务的执行给到后台线程执行,等任务处理完成之后返回,比如Java8的CompletableFuture。

事件弹性

事件驱动系统是松耦合的,上下游之间不是直接依赖,但是在Debug时成本更高一些。

Spring Reactor

Spring Reactor是Pivotal基于反应式编程实现的一种方案。是一种非阻塞,事件驱动的编程方案,使用函数式编程实现。

观察者模式

反应式编程和命令式编程在迭代器上的实现:

  • 事件 Iterable (pull) Observable (push)
  • 获取数据 T next() onNext(T)
  • 发现异常 throws Exception onError(Exception)
  • 处理完成 hasNext() onCompleted()
  1. Publisher推送数据给Subscriber,触发onNext()方法,在处理完成或发生异常时触发onCompleted()和onError()方法。
  2. Publisher发生异常时,触发Subscriber的onError()方法,进行异常捕获处理。
  3. Publisher每次推送都会触发一次onNext()方法,所有推送完成时,最后触发onCompleted()方法。

背压

如果Publisher发布消息太快,超过Subscriber处理速度该怎么办?响应式编程引入了背压概念,使得Subscriber能够控制消费消息的速度。

Reactive Stream

在Java生态中,Netflix的RxJava,TypeSafe的Scala,Akaka,pivatol的Sping,Reactor都是反应式编程的框架。

Stream不是集合元素,不是数据结构,也不保存数据,只是关于算法和计算,更像一种可以编程的迭代器。

Stream可以并行操作,迭代器只能命令式的,串型操作。并行操作是将数据分成多段,每一个在不同线程中处理,最后将结果一起输出。这样可以大大利用硬件资源。

Reactor主要模块基于Netty实现:

  • reactor-core:包含核心API
  • reactor-ipc:复杂高性能网络通信

核心类:

  • Mono:代表0到1个元素发布者
  • Flux:代表0到N个元素发布者
  • Scheduler:代表事件驱动的反应流调度器,通常由各种线程池实现。

反应式编程概念总结:

  1. ReactiveStreams 是一套反应式编程 标准 和 规范;
  2. Reactor 是基于 ReactiveStreams 一套 反应式编程框架;
  3. WebFlux 以 Reactor 为基础,实现 Web 领域的 反应式编程框架。

Reactor开发

Reactor使用方式上基本分为三步:

  1. 开始阶段创建
  2. 中间阶段处理
  3. 最终阶段消费

创建阶段

Reactor编程需要先创建出Mono或Flux。

同步调用结果创建对象

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Mono<String> helloWorld = Mono.just("Hello World"); // 可以指定序列中包含的全部元素

Flux<String> fewWords = Flux.just("Hello","World");

Flux<String> manyWords = Flux.fromIterable(words);

这种方式一般用在经过一系列非IO型操作后,得到一个对应的对象,当需要将这个对象交给IO操作时,可以通过这种方式转换成Mono或Flux。

异步调用结果创建

如果异步得到结果,比如CompletableFuture可以创建一个Mono:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Mono.fromFuture(completableFuture);

如果这个异步调用不返回CompletableFuture,而有自己的回调方法,那么可以使用:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
static<T>Mono<T>create(Consumer<MonoSink<T>>callback)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Mono.create(sink ->{

 ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url,String.class);

 entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>(){
 
 @Override
 public void onSuccess(ResponseEntity<String> result){
  sink.success(result.getBody());
 }

 @Override
 public void onFailure(Throwable ex)
 {
   sink.error(ex);
 }
});
});

处理阶段

在进行Mono和Flux处理阶段,一般使用filtermapflatMapthenzipreduce等。

map,flatMap,then 三个频率使用比较高。

数据处理方式

then

是下一步意思,代表执行顺序的下一步,不表示下一步依赖于上一步。then方法参数只是一个Mono,入参不是上一步的执行结果。

flatMap和map的参数是Function,是上一步执行的结果。

flatMap和map

传统的命令式编程

Object result1 = doStep1(params);

Object result2 = doStep2(result1);

Object result3 = doStep3(result2);

对应的反应式编程

Mono.just(params) .flatMap(v -> doStep1(v)) .flatMap(v -> doStep2(v)) .flatMap(v -> doStep3(v));

flatMap入参Function的返回值要求是Mono对象。map的入参Function只要求返回一个普通对象。对于一些返回值是Mono的方法,想将调用串联起链式调用,必须使用flatMap,而不是map。

并发处理方式

一般使用Mono.zip,Tuple2等。

传统编程方式并发执行是通过线程池+Future方式实现的。但是在做Future.get时是阻塞的。Reactor中使用Mono和Flux中的zip方法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Mono<CustomType1> item1Mono = ...;

Mono<CustomType2> item2Mono = ...;

Mono.zip(items -> {
 CustomType1 item1 = CustomType1.class.cast(items[0]);
 CustomType2 item2 = CustomType2.class.cast(items[1]);
// Do merge
return mergeResult;
}, item1Mono, item2Mono);

这样item1Mono 和 item2Mono 过程是并行执行的。

使用zip方法时需要做类型强转换,类型强转换是不安全的

数据循环处理

一般使用:Flux.fromIterable(),Flux.reduce()方法。

比如:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Data initData = ...;
List<SubData> list = ...;

Flux.fromIterable(list)
  .reduce(initData,(data,itemInList) -> {
// Do something on data and itemInList
return data;
});

结束阶段

直接消费的Mono和Flux就是调用subscriber方法,其他的WebFlux接口可以直接返回框架的Response输出就可以了。

WebFlux

Serverlet3.1支持了异步处理方式,Servlet线程不需要一直阻塞的等待任务执行。Servlet在接收到请求后,将请求委托给业务线程完成,自己则直接返回继续接收新的请求。

所以Servlet3.1适用于那些业务处理非常耗时场景,这样可以减少服务器资源占用,可以提高并发处理速度,但是对于本身响应较为迅速的应用来说收益不大。

WebFlux的异步处理是基于Reactor实现的,是将输入流适配成Mono或Flux进行统一处理。

在最新的Spring Cloud Gateway中也是基于Netty和WebFlux实现的。

Flux和Mono

Flux和Mono属于事件发布者,类似于生产者,为消费者提供订阅接口。在实现发生时,Flux和Mono会回调消费者对应的方法通知消费者处理事件。Flux可以触发多个事件,Mono只触发一个事件。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Flux.fromIterable(getSomeLongList())
	.mergeWith(Flux.interval(100))
	.doOnNext(serviceA::someObserver)
	.map(d -> d * 2)
	.take(3)
	.onErrorResumeWith(errorHandler::fallback)
	.doAfterTerminate(serviceM::incrementTerminate)
	.subscribe(System.out::println
);

Mono.fromCallable(System::currentTimeMillis)
	.flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
	.timeout(Duration.ofSeconds(3), errorHandler::fallback)
	.doOnSuccess(r -> serviceM.incrementSuccess())
	.subscribe(System.out::println);

选型注意

如果在框架中使用了WebFlux,他依赖的安全认证,数据访问都必须使用Reactive API,在存储层目前Reactive只支持MongoDBRedisCouchbase等几种不支持事务管理的NoSql,需要注意。

WebFlux并不能将接口耗时减少,只是可以减少线程扩展,提升系统的吞吐和伸缩能力。由于其为异步非阻塞Web框架,所以适用于IO密集型服务,比如我们交易网关这种。

WebFlux支持两种编程模式:

  1. 基于注解@Controller和其他的类Spring MVC的注解
  2. 函数式,Java8 lambda风格的路由处理

可以通过Reactive Streams实现背压控制。

ServerRequest和ServerResponse是JDK8友好访问底层HTTP消息的不可变接口。完全是响应式的。

实践建议

在使用lambda写处理函数时,如果多个处理函数可能缺乏可读性且不易于维护。可以将相关处理函数分组到一个处理程序或控制器类中。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 春哥talk 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
我对响应式编程中Mono和Flux的理解
很多同学反映对响应式编程中的Flux和Mono这两个Reactor中的概念有点懵逼。但是目前Java响应式编程中我们对这两个对象的接触又最多,诸如Spring WebFlux、RSocket、R2DBC。我开始也对这两个对象头疼,所以今天我们就简单来探讨一下它们。
码农小胖哥
2020/09/01
3K0
我对响应式编程中Mono和Flux的理解
Java 平台反应式编程(Reactive Programming)入门
反应式编程(Reactive Programming)对有些人来说可能相对陌生一点。反应式编程是一套完整的编程体系,既有其指导思想,又有相应的框架和库的支持,并且在生产环境中有大量实际的应用。在支持度方面,既有大公司参与实践,也有强大的开源社区的支持。 反应式编程出现的时间并不短,不过在最近的一段时间内,它得到了很大的关注。这主要体现在主流编程平台和框架增强了对它的支持,使它得到了更多的受众,同时也反映了其在开发中的价值。 就 Java 平台来说,几个突出的事件包括:Java 9中把反应式流规范以 java
CSDN技术头条
2018/03/26
8.9K0
Java 平台反应式编程(Reactive Programming)入门
【Spring底层原理高级进阶】基于Spring Boot和Spring WebFlux的实时推荐系统的核心:响应式编程与 WebFlux 的颠覆性变革
传统的Spring MVC架构是一种基于Java的Web应用程序开发框架,它遵循了MVC(Model-View-Controller)设计模式。下面将介绍传统Spring MVC架构的基本原理和组件:
苏泽
2024/03/01
4370
WebFlux 初体验
松哥原创的 Spring Boot 视频教程已经杀青,感兴趣的小伙伴戳这里-->Spring Boot+Vue+微人事视频教程
江南一点雨
2021/06/10
2.2K0
Spring-webflux 响应式编程
Spring 提供了两个并行堆栈。一种是基于带有 Spring MVC 和 Spring Data 结构的 Servlet API。另一个是完全反应式堆栈,它利用了 Spring WebFlux 和 Spring Data 的反应式存储库。在这两种情况下,Spring Security 都提供了对两种堆栈的支持。
鱼找水需要时间
2023/02/16
1.6K0
Spring-webflux 响应式编程
深入理解Reactor核心概念
随着 Web 应用和分布式系统的复杂性不断增加,传统的同步编程模型逐渐暴露出难以应对高并发、高吞吐量需求的局限性。Java 在 8 之后引入了大量新特性,包括响应式编程的出现。Reactor 是 Java 世界中实现响应式编程的一个重要库,它与 Spring WebFlux 紧密集成,并且构建在 Java 的 Reactive Streams 标准之上。
CoderJia
2024/10/20
2920
深入理解Reactor核心概念
一文了解Spring Framework 5 新 Web 框架:Spring WebFlux
Spring WebFlux 是 Spring Framework 5 引入的一个新的 Web 框架,用于构建反应式 Web 应用程序。与传统的基于 Servlet API 的 Spring MVC 框架不同,Spring WebFlux 基于 Reactor 库和 Reactive Streams 规范,使用异步非阻塞方式处理请求和响应,以提高应用程序的性能和可伸缩性。Spring WebFlux 支持多种编程模型和响应式编程范式,例如函数式编程、反应式编程和流式编程等,使得开发人员可以更加灵活和高效地构建 Web 应用程序。
网络技术联盟站
2023/05/03
2.6K0
一文了解Spring Framework 5 新 Web 框架:Spring WebFlux
Spring5之新功能Webflux
(1)是 Spring5 添加新的模块,用于 web 开发的,功能和 SpringMVC 类似的,Webflux 使用 当前一种比较流程响应式编程出现的框架。
yuanshuai
2022/08/22
9710
Spring5之新功能Webflux
Spring Boot 系列 —— Spring Webflux
Java 8提出了函数式接口的概念。所谓函数式接口,简单来说,就是只定义了单一抽象方法的接口。 【示例】
求和小熊猫
2022/06/30
1.6K0
Spring Boot 系列 —— Spring Webflux
艿艿连肝了几个周末,写了一篇贼长的 Spring 响应式 Web 框架 WebFlux!市面第二完整~
摘要: 原创出处 http://www.iocoder.cn/Spring-Boot/WebFlux/ 「芋道源码」欢迎转载,保留摘要,谢谢!
芋道源码
2020/06/01
6.2K0
深入剖析 Spring WebFlux
WebFlux 是 Spring Framework5.0 中引入的一种新的反应式Web框架。通过Reactor项目实现Reactive Streams规范,完全异步和非阻塞框架。本身不会加快程序执行速度,但在高并发情况下借助异步IO能够以少量而稳定的线程处理更高的吞吐,规避文件IO/网络IO阻塞带来的线程堆积。
冬夜先生
2021/10/12
1.3K0
异步编程 - 11 Spring WebFlux的异步非阻塞处理
我们这里主要探讨Spring框架5.0中引入的新的WebFlux技术栈,并介绍其存在的价值与意义、并发模型与适用场景、如何基于WebFlux实现异步编程,以及其内部的实现原理。
小小工匠
2023/09/09
2.6K0
异步编程 - 11 Spring WebFlux的异步非阻塞处理
为什么使用Reactive之反应式编程简介
前一篇分析了Spring WebFlux的设计及实现原理后,反应式编程又来了,Spring WebFlux其底层还是基于Reactive编程模型的,在java领域中,关于Reactive,有一个框架规范,叫【Reactive Streams】,在java9的ava.util.concurrent.Flow包中已经实现了这个规范。其他的优秀实现还有Reactor和Rxjava。在Spring WebFlux中依赖的就是Reactor。虽然你可能没用过Reactive开发过应用,但是或多会少你接触过异步Servlet,同时又有这么一种论调:异步化非阻塞io并不能增强太多的系统性能,但是也不可否认异步化后并发性能上去了。听到这种结论后在面对是否选择Reactive编程后,是不是非常模棱两可。因为我们不是很了解反应式编程,所以会有这种感觉。没关系,下面看看反应式编程集大者Reactor是怎么阐述反应式编程的。
kl博主
2023/11/18
4350
Spring5---新特性(WebFlux)
传统的web框架,比如springmvc,这些是基于servlet容器,webflux是一种异步非阻塞的框架,异步非阻塞的框架是在servlet 3.1 以后才支持的,核心是基于Reactor的相关API实现的
大忽悠爱学习
2021/11/15
1.8K0
Spring Boot 中的响应式编程和 WebFlux 入门
Spring 5.0 中发布了重量级组件 Webflux,拉起了响应式编程的规模使用序幕。
纯洁的微笑
2019/05/06
3.9K0
Spring Boot 中的响应式编程和 WebFlux 入门
响应式编程简介之:Reactor
Reactor是reactivex家族的一个非常重要的成员,Reactor是第四代的reactive library,它是基于Reactive Streams标准基础上开发的,主要用来构建JVM环境下的非阻塞应用程序。
程序那些事
2020/11/11
1.4K0
Java一分钟之-Spring WebFlux:响应式编程
在Java的世界里,Spring框架一直扮演着举足轻重的角色。随着技术的演进,Spring也与时俱进地推出了支持响应式编程模型的Spring WebFlux框架。本文将带你快速入门Spring WebFlux,探讨其核心概念、常见问题、易错点及规避策略,并通过代码示例让你直观感受响应式编程的魅力。
Jimaks
2024/06/16
4250
Reactor响应式编程 之 简介
Reactor 项目始于 2012 年。 经过长时间的内部孵化,于 2013 年发布 Reactor 1.x 版本。 Reactor 1 在各种架构下都能成功部署,包括开源的(如 Meltdown)和商业的(如 Pivotal RTI)。2014年,通过与一些新兴的响应式数据流规范合作,重新设计并于 2015 年 4 月发布 Reactor 2.0 版本。
伍六七AI编程
2022/11/02
1.3K0
外行人都能看懂的WebFlux,错过了血亏
如果有关注我公众号文章的同学就会发现,最近我不定时转发了一些比较好的WebFlux的文章,因为我最近在学。
Rude3Knife的公众号
2019/11/18
6620
今日榜首|10年高级技术专家用7000字带你详解响应式技术框架
对于响应式编程来说,响应式流是一种非阻塞、响应式、异步流处理、支持背压的技术标准,包括运行时环境(JVM和JavaScript)及网络协议。JDK 9发布的Flow API(java.util.concurrent.Flow)和响应式流规范呼应,成为响应式编程事实上的标准。
愿天堂没有BUG
2022/10/28
1.6K0
今日榜首|10年高级技术专家用7000字带你详解响应式技术框架
推荐阅读
相关推荐
我对响应式编程中Mono和Flux的理解
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验