Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >通过Spring Boot Webflux实现Reactor Kafka

通过Spring Boot Webflux实现Reactor Kafka

原创
作者头像
本人秃顶程序员
修改于 2019-04-24 11:25:00
修改于 2019-04-24 11:25:00
3.6K10
代码可运行
举报
文章被收录于专栏:Java架构筑基Java架构筑基
运行总次数:0
代码可运行

在Apache Kafka简介中,我们研究了分布式流媒体平台Apache Kafka。这一次,我们将关注Reactor Kafka,这个库可以创建从Project Reactor到Kafka Topics的Reactive Streams,反之亦然。

我们将使用两个小型示例应用程序,Paymentprocessor Gateway和PaymentValidator。这些应用程序的代码可以在这里找到。

Paymentprocessor网关提供了一个小网页,可以生成一个随机的信用卡号码(显然是伪造的),以及支付金额。当用户单击提交按钮时,表单将提交给网关的API。API具有针对Kafka群集上的未确认事务主题的反应流,这个未确认事务的主题的另外一边消费者是PaymentValidator,监听要验证的传入消息。然后,这些消息通过响应管道,验证方法将其打印到命令行。

通过Reactive Streams向Kafka发送消息

我们的应用程序构建在Spring 5和Spring Boot 2之上,使我们能够快速设置和使用Project Reactor。

Gateway应用程序的目标是设置从Web控制器到Kafka集群的Reactive流。这意味着我们需要特定的依赖关系来弹簧webflux和reactor-kafka。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

Spring Webflux RestController提供支付API,为paymentGateway类的doPayment方法创建一个Reactive流。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/ ** 
     *调用返回的Mono将被发送到Spring Webflux,后者依赖于multi-reactor 事件循环和NIO 
     *以非阻塞方式处理请求,从而实现更多的并发请求。结果将
     通过一个名为Server Sent Events 发送。
     ** /
@PostMapping(value = "/payment")
    public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) {
    / ** 
             当调用doPayment方法时,我们发送付款信息,获得Mono <Void>作为响应。
             当我们的付款成功发送事件到Kafka主题
             ** / 
            return paymentGateway.doPayment(payment);
}

paymentGateway需要一个kafkaProducer,它使我们能够将消息作为管道的一部分放在Kafka主题中。它可以使用KafkaSender.create方法轻松创建,传递许多生产者选项。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public PaymentGatewayImpl() {
    final Map<String, Object> producerProps = new HashMap<>();
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
    kafkaProducer = KafkaSender.create(producerOptions);
}

创建之后,kafkaProducer可以用来轻松地将我们的消息发送到选择的Kafka主题,成为控制器中启动的管道的一部分。因为消息是以非阻塞方式发送到Kafka集群的,所以我们可以使用项目Reactor的事件循环接收并将来自Web API的大量并发消息路由到Kafka。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
    public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
    final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);
    String payload = toBinary(payment);
    SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
    return kafkaProducer.send(Mono.just(message)).next();
}
private String toBinary(Object object) {
    try {
        return objectMapper.writeValueAsString(object);
    }
    catch (JsonProcessingException e) {
        throw new IllegalArgumentException(e);
    }
}

从Kafka主题创建反应流

当没有消费者监听时,向主题发送消息没有多大意义,因此我们的第二个应用程序将使用一个反应管道来监听未确认的事务主题。为此,使用KafkaReceiver.create方法创建kafkaReceiver对象,类似于我们之前创建kafkaProducer的方法。

通过使用kafkaReceiver.receive方法,我们可以获得receiverRecords的Flux。进入我们读取的主题中每条消息都放入receiverRecord中。流入应用程序后,它们会进一步通过反应管道。然后,这些消息传递processEvent方法,该方法调用paymentValidator,该方法将一些信息输出到控制台。最后,在receiverOffset上调用acknowledge方法,向Kafka集群发送一条消息已被处理的确认。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
    this.paymentValidator = paymentValidator;
    final Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                    .subscription(Collections.singleton("unconfirmed-transactions"))
                    .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                    .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
    kafkaReceiver = KafkaReceiver.create(consumerOptions);
    /**
         * We create a receiver for new unconfirmed transactions
         */
    ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                    .doOnNext(r -> {
        /**
                     * Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
                     */
        final PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.class);
        processEvent(paymentEvent);
        r.receiverOffset().acknowledge();
    }
    )
                    .subscribe();
}
private void processEvent(PaymentEvent paymentEvent) {
    paymentValidator.calculateResult(paymentEvent);
}
private <T> T fromBinary(String object, Class<T> resultType) {
    try {
        return objectMapper.readValue(object, resultType);
    }
    catch (IOException e) {
        throw new IllegalArgumentException(e);
    }
}

读者福利:

分享免费学习资料

针对于Java程序员,我这边准备免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)

为什么某些人会一直比你优秀,是因为他本身就很优秀还一直在持续努力变得更优秀,而你是不是还在满足于现状内心在窃喜!希望读到这的您能点个小赞和关注下我,以后还会更新技术干货,谢谢您的支持!

资料领取方式:加入Java技术交流群963944895点击加入群聊,私信管理员即可免费领取

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
1 条评论
热度
最新
java.lang.IllegalStateException: Multiple subscribers are not supported for KafkaReceiver flux at reactor.kafka.receiver.internals.DefaultKafkaReceiver.createConsumerFlux(DefaultKafkaReceiver.java:247) ~[reactor-kafka-1.2.0.RELEASE.jar:1.2.0.RELEASE] Suppressed: reactor.core.publisher.FluxOnAssemb
java.lang.IllegalStateException: Multiple subscribers are not supported for KafkaReceiver flux at reactor.kafka.receiver.internals.DefaultKafkaReceiver.createConsumerFlux(DefaultKafkaReceiver.java:247) ~[reactor-kafka-1.2.0.RELEASE.jar:1.2.0.RELEASE] Suppressed: reactor.core.publisher.FluxOnAssemb
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
Java实现多用户即时通信系统
因为还没有学习数据库,我们人为规定用户名/id = 100, 密码123456 就可以登录,其它用户不能登录 后面使用HashMap模拟数据库,可以多个用户登录。
timerring
2023/05/24
1.5K0
Java实现多用户即时通信系统
java项目:客户管理系统
源码已经打包在博客末尾,有需要的话自行下载。 java 项目: 客户信息管理系统 环境: ​ Idea 集成开发工具 技术点: 面向对象 数组 项目结构: 20200801181546.png 20200801180817.png 代码展示: 【bean】 Customer.java public class Customer { private String name; // 姓名 private char gender; // 性别 private int age; // 年龄
OY
2022/03/12
2.6K0
java项目:客户管理系统
Java数据库项目之满汉楼
src/com/hspedu/mhl/dao/DiningTableDAO.java
timerring
2023/05/24
6720
Java数据库项目之满汉楼
Java 多用户即时通信系统
# Java 多用户即时通信系统 # 涉及到Java各个方面的技术 项目框架设计 java面向对象编程 网络编程 多线程 IO流 Mysql/使用集合充当内存数据库 # 需求分析 用户登录 拉取在线用户列表 无异常退出(客户端、服务端) 私聊 群聊 发文件 服务器推送新闻 # 界面设计 # 功能实现-用户登录 # 功能实现-拉取在线用户列表 # 功能实现-无异常退出 # 功能实现-私聊 # 功能实现-群聊 # 功能实现-发文件 # 功能实现-服务器推送新闻
用户9615083
2022/12/25
3.2K0
Java 多用户即时通信系统
一个面向对象的Java客户管理系统
1 项目目录 image.png 2 CMUtility 工具类 package com.binbin.p2.util; import java.util.*; /** CMUtility工具类: 将不同的功能封装为方法,就是可以直接通过调用方法使用它的功能,而无需考虑具体的功能实现细节。 */ public class CMUtility { private static Scanner scanner = new Scanner(System.in); /** 用于界面菜单的选择。该
Linuxcc
2022/02/07
7310
第三章:流程控制语句
说明:一旦表达式为true , 则进入执行相应的语句块,执行完成对应的语句块之后 ,就跳出当前结构
捞月亮的小北
2024/01/18
2050
第三章:流程控制语句
房屋出售出租系统 毕业设计 JAVA+Vue+SpringBoot+MySQL
基于JAVA+Vue+SpringBoot+MySQL的房屋出售出租系统包含了房屋销售模块、房屋出租模块、预定意向模块、交易订单模块、图表分析模块,还包含系统自带的用户管理、部门管理、角色管理、菜单管理、日志管理、数据字典管理、文件管理、图表展示等基础模块,房屋出售出租系统基于角色的访问控制,给管理员、买家和卖家角色使用,可将权限精确到按钮级别,您可以自定义角色并分配权限,系统适合设计精确的权限约束需求。
Designer 小郑
2023/11/28
6030
房屋出售出租系统 毕业设计 JAVA+Vue+SpringBoot+MySQL
Vue+SpringBoot打造房屋租售信息系统
本软件是基于 Vue 和 SpringBoot 的房屋出租出售信息系统,主要包含了房屋租赁、房屋出售、房屋交易这三条业务线,开发者可以在这个项目上进行二次开发,以满足自己的需要。
Designer 小郑
2023/08/01
3810
Vue+SpringBoot打造房屋租售信息系统
快速理解 Java 静态代理 / 动态代理
在有些情况下,一个客户不能或者不想直接访问另一个对象,这时需要找一个中介帮忙完成某项任务,这个中介就是代理对象。
BUG弄潮儿
2021/04/26
3510
Java IO流详解
getName、getAbsolutePath、getParent、length、exists、isFile、isDirectory
timerring
2023/05/09
3980
Java IO流详解
SSM框架整合项目 :租房管理系统
 使用ssm框架整合,oracle数据库 框架: Spring SpringMVC MyBatis 导包: 1, spring 2, MyBatis 3, mybatis-spring 4, fast
二十三年蝉
2018/02/28
2.5K0
SSM框架整合项目 :租房管理系统
JAVA 代理
在讲代理之前,先说一下何为代理,通俗的讲解,当我们在购买房子时,我们需要通过中介来进行,即中介从房东手中获取房源,我们通过中介了解到房屋信息,中介起到了一个中间人的作用,此时他相当于代理。
用户9691112
2023/06/27
1380
JAVA 代理
Java面向对象编程中级
只能包含数字、字母、下划线、小圆点.,但不能用数字开头,不能是关键字或保留字。 命名规范
timerring
2023/04/21
5110
Java面向对象编程中级
java之 代理设计模式
1. 设计一个案例来实现租房功能。分析:在租房的过程中涉及到了3个对象,房东,中介,房客。
Vincent-yuan
2019/09/11
3510
爬虫实战--拿下最全租房数据 | 附源码
上一篇的实战只是给大家作为一个练手,数据内容比较少,且官网也有对应的 API,难度不大。
小一不二三
2020/01/08
2.7K1
爬虫实战--拿下最全租房数据 | 附源码
Java编写汽车租赁系统
控制台输入的内容,我选择的是输入字符串类型,没有按照效果图上,如果你做的话,你可以用三元运算符来实现哦!
Java团长
2021/01/05
2K0
Java编写汽车租赁系统
198. House Robber(打家劫舍)(求不相邻的位置上的数字之和的最大值)
You are a professional robber planning to rob houses along a street. Each house has a certain amount of money stashed, the only constraint stopping you from robbing each of them is that adjacent houses have security system connected and it will automatically contact the police if two adjacent houses were broken into on the same night.
砖业洋__
2023/05/06
2550
198. House Robber(打家劫舍)(求不相邻的位置上的数字之和的最大值)
客户信息管理系统_销售找客户最好的app
客户信息管理系统,功能如下: (1)添加客户信息 (2)修改客户信息 (3)删除客户数据 (4)查询客户列表 (5)所有数据通过JDBC保存到MySql数据库中 1,数据库名:cms_hisoft 2,表名:users 3,字段列表和类型: id,int,主键,自动增长 name,varchar(20),姓名 gender,varchar(5),性别 age,int,年龄 phone…
全栈程序员站长
2022/11/02
1.5K0
JDBC和数据库连接池
说明:JDBC是Java提供一套用于数据库操作的接口APl, Java程序员只需要面向这套接口编程即可。不同的数据库厂商,需要针对这套接口,提供不同实现。
timerring
2023/05/24
9310
JDBC和数据库连接池
Java常用类应用详解
可以通过图查询到其含有的字段和方法,jump to source 可以查看到源码。
timerring
2023/05/01
4880
Java常用类应用详解
相关推荐
Java实现多用户即时通信系统
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验