Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >通过Spring Boot Webflux实现Reactor Kafka

通过Spring Boot Webflux实现Reactor Kafka

原创
作者头像
本人秃顶程序员
修改于 2019-04-24 11:25:00
修改于 2019-04-24 11:25:00
3.6K10
代码可运行
举报
文章被收录于专栏:Java架构筑基Java架构筑基
运行总次数:0
代码可运行

在Apache Kafka简介中,我们研究了分布式流媒体平台Apache Kafka。这一次,我们将关注Reactor Kafka,这个库可以创建从Project Reactor到Kafka Topics的Reactive Streams,反之亦然。

我们将使用两个小型示例应用程序,Paymentprocessor Gateway和PaymentValidator。这些应用程序的代码可以在这里找到。

Paymentprocessor网关提供了一个小网页,可以生成一个随机的信用卡号码(显然是伪造的),以及支付金额。当用户单击提交按钮时,表单将提交给网关的API。API具有针对Kafka群集上的未确认事务主题的反应流,这个未确认事务的主题的另外一边消费者是PaymentValidator,监听要验证的传入消息。然后,这些消息通过响应管道,验证方法将其打印到命令行。

通过Reactive Streams向Kafka发送消息

我们的应用程序构建在Spring 5和Spring Boot 2之上,使我们能够快速设置和使用Project Reactor。

Gateway应用程序的目标是设置从Web控制器到Kafka集群的Reactive流。这意味着我们需要特定的依赖关系来弹簧webflux和reactor-kafka。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

Spring Webflux RestController提供支付API,为paymentGateway类的doPayment方法创建一个Reactive流。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/ ** 
     *调用返回的Mono将被发送到Spring Webflux,后者依赖于multi-reactor 事件循环和NIO 
     *以非阻塞方式处理请求,从而实现更多的并发请求。结果将
     通过一个名为Server Sent Events 发送。
     ** /
@PostMapping(value = "/payment")
    public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) {
    / ** 
             当调用doPayment方法时,我们发送付款信息,获得Mono <Void>作为响应。
             当我们的付款成功发送事件到Kafka主题
             ** / 
            return paymentGateway.doPayment(payment);
}

paymentGateway需要一个kafkaProducer,它使我们能够将消息作为管道的一部分放在Kafka主题中。它可以使用KafkaSender.create方法轻松创建,传递许多生产者选项。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public PaymentGatewayImpl() {
    final Map<String, Object> producerProps = new HashMap<>();
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
    kafkaProducer = KafkaSender.create(producerOptions);
}

创建之后,kafkaProducer可以用来轻松地将我们的消息发送到选择的Kafka主题,成为控制器中启动的管道的一部分。因为消息是以非阻塞方式发送到Kafka集群的,所以我们可以使用项目Reactor的事件循环接收并将来自Web API的大量并发消息路由到Kafka。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
    public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
    final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);
    String payload = toBinary(payment);
    SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
    return kafkaProducer.send(Mono.just(message)).next();
}
private String toBinary(Object object) {
    try {
        return objectMapper.writeValueAsString(object);
    }
    catch (JsonProcessingException e) {
        throw new IllegalArgumentException(e);
    }
}

从Kafka主题创建反应流

当没有消费者监听时,向主题发送消息没有多大意义,因此我们的第二个应用程序将使用一个反应管道来监听未确认的事务主题。为此,使用KafkaReceiver.create方法创建kafkaReceiver对象,类似于我们之前创建kafkaProducer的方法。

通过使用kafkaReceiver.receive方法,我们可以获得receiverRecords的Flux。进入我们读取的主题中每条消息都放入receiverRecord中。流入应用程序后,它们会进一步通过反应管道。然后,这些消息传递processEvent方法,该方法调用paymentValidator,该方法将一些信息输出到控制台。最后,在receiverOffset上调用acknowledge方法,向Kafka集群发送一条消息已被处理的确认。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
    this.paymentValidator = paymentValidator;
    final Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                    .subscription(Collections.singleton("unconfirmed-transactions"))
                    .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                    .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
    kafkaReceiver = KafkaReceiver.create(consumerOptions);
    /**
         * We create a receiver for new unconfirmed transactions
         */
    ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                    .doOnNext(r -> {
        /**
                     * Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
                     */
        final PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.class);
        processEvent(paymentEvent);
        r.receiverOffset().acknowledge();
    }
    )
                    .subscribe();
}
private void processEvent(PaymentEvent paymentEvent) {
    paymentValidator.calculateResult(paymentEvent);
}
private <T> T fromBinary(String object, Class<T> resultType) {
    try {
        return objectMapper.readValue(object, resultType);
    }
    catch (IOException e) {
        throw new IllegalArgumentException(e);
    }
}

读者福利:

分享免费学习资料

针对于Java程序员,我这边准备免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)

为什么某些人会一直比你优秀,是因为他本身就很优秀还一直在持续努力变得更优秀,而你是不是还在满足于现状内心在窃喜!希望读到这的您能点个小赞和关注下我,以后还会更新技术干货,谢谢您的支持!

资料领取方式:加入Java技术交流群963944895点击加入群聊,私信管理员即可免费领取

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
1 条评论
热度
最新
java.lang.IllegalStateException: Multiple subscribers are not supported for KafkaReceiver flux at reactor.kafka.receiver.internals.DefaultKafkaReceiver.createConsumerFlux(DefaultKafkaReceiver.java:247) ~[reactor-kafka-1.2.0.RELEASE.jar:1.2.0.RELEASE] Suppressed: reactor.core.publisher.FluxOnAssemb
java.lang.IllegalStateException: Multiple subscribers are not supported for KafkaReceiver flux at reactor.kafka.receiver.internals.DefaultKafkaReceiver.createConsumerFlux(DefaultKafkaReceiver.java:247) ~[reactor-kafka-1.2.0.RELEASE.jar:1.2.0.RELEASE] Suppressed: reactor.core.publisher.FluxOnAssemb
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
Kafka教程_图解kafka
推荐【Kafka教程】https://bigbird.blog.csdn.net/article/details/108770504 推荐【rabbitmq教程】https://bigbird.blog.csdn.net/article/details/81436980 推荐【Flink教程】https://blog.csdn.net/hellozpc/article/details/109413465 推荐【SpringBoot教程】https://blog.csdn.net/hellozpc/article/details/107095951 推荐【SpringCloud教程】https://blog.csdn.net/hellozpc/article/details/83692496 推荐【Mybatis教程】https://blog.csdn.net/hellozpc/article/details/80878563 推荐【SnowFlake教程】https://blog.csdn.net/hellozpc/article/details/108248227 推荐【并发限流教程】https://blog.csdn.net/hellozpc/article/details/107582771 推荐【JVM面试与调优教程】https://bigbird.blog.csdn.net/article/details/113888604
全栈程序员站长
2022/11/03
2.2K1
Kafka教程_图解kafka
超详细的Kafka教程-从部署到开发到原理都有讲解
在说Kafka之前,假设你有一定的消息队列的知识。知道消息队列的模式(点对点模式,发布/订阅模式),也知道消息队列的优点,如果不知道没关系,去百度或者Google搜索都有相关详细的资料。那么我们接下来说说Kafka。
Lvshen
2022/05/05
10.4K0
超详细的Kafka教程-从部署到开发到原理都有讲解
从Reactor到WebFlux
为了应对高并发场景下到服务端编程需求,微软最先提出了一种异步编程到方案Reactive Programming,也就是反应式编程。
春哥大魔王
2019/11/06
4.8K0
5分钟带你体验一把 Kafka
新建一个名为 zk-single-kafka-single.yml 的文件,文件内容如下:
Guide哥
2020/05/07
9330
5分钟带你体验一把 Kafka
Kafka生产者与消费者练习测试题
一、通过Producer API发送到kafka中的【topicHW】 注:topic自行创建 二、创建一个Consumer API程序,对kafka集群中的【topicHW】进行消费。 处理消费到的数据,将消费到的数据发送到另外一个名为topicDEAL 的topic中 ,要求如下:
火之高兴
2024/07/25
850
Kafka生产者与消费者练习测试题
Kafka的发布-订阅功能: Java实现与应用场景解析
Apache Kafka 是一个流行的分布式消息传递系统,广泛用于构建实时数据管道和流应用程序。它以高吞吐量、内置的分区、副本和容错性著称。Kafka 主要提供发布-订阅模式,这里我们将深入探讨 Kafka 的发布-订阅功能,通过一个 Java 代码示例和一个具体的应用场景。
GeekLiHua
2025/01/21
1850
kafka生产者Producer、消费者Consumer的拦截器interceptor
1、Producer的拦截器interceptor,和consumer端的拦截器interceptor是在kafka0.10版本被引入的,主要用于实现clients端的定制化控制逻辑,生产者拦截器可以用在消息发送前做一些准备工作,使用场景,如下所示:
别先生
2021/01/13
1.7K0
Kafka - 3.x Kafka 生产者分区技巧全面指北
消息在通过 send()方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。
小小工匠
2023/10/28
4470
Kafka - 3.x Kafka 生产者分区技巧全面指北
聊聊kafka0.8的topic的partition以及topicCountMap
本文主要研究下kafka0.8版本api的topicCountMap与topic的partition的关系。
code4it
2018/09/17
4350
centos7单机安装kafka,进行生产者消费者测试
原文出处:http://www.yund.tech/zdetail.html?type=1&id=3028469704c7976aef5b824811dd3bf5 作者:jstarseven  一、k
大道七哥
2019/08/23
7190
centos7单机安装kafka,进行生产者消费者测试
微服务同时接入多个Kafka
kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息 kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中, producerFactory 生产者工厂 consumerFactory 消费者工厂 producerConfigs 生产者配置 consumerConfigs 消费者配置
阿提说说
2022/11/18
1.3K0
微服务同时接入多个Kafka
Kafka与Spark Streaming整合
Spark Streaming是一个可扩展,高吞吐,容错能力强的实时流式处理处理系统。一般的系统架构图是,数据从一个源点,经过Sparing Streaming处理,最后汇聚到一个系统。Spark Streaming的数据来源可以非常丰富,比如Kafka, Flume, Twitter, ZeroMQ, Kinesis 或者是任何的TCP sockets程序。对于数据的处理,Spark Streaming提供了非常丰富的高级api,例如map,redue,joini和窗口函数等等。数据处理完成后,可以存储到其他地方,比如文件系统,对象存储,数据库。典型的数据处理流程图:
haimingli
2021/01/13
5540
【Kafka】使用Java实现数据的生产和消费
Kafka 是由 LinkedIn 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。
后端码匠
2023/02/27
1.9K0
【Kafka】使用Java实现数据的生产和消费
怎么使用Kafka?收藏这篇短文就可以了
便于大家对本章内容的理解,我重新整理了一下Kafka中的部分重要概念,以表格的方式呈现出来,请见下表所示:
爪哇缪斯
2023/08/26
5270
怎么使用Kafka?收藏这篇短文就可以了
Kafka快速入门
LEO:Log End Offset,待写入消息的offset,即最后一条消息的offset+1
兜兜转转
2023/03/06
3660
Spring Boot Kafka概览、配置及优雅地实现发布订阅
本文属于翻译,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net
别打名名
2019/12/24
16K0
Spring Boot Kafka概览、配置及优雅地实现发布订阅
当Spring邂逅Kafka,有趣的知识增加了
Kafka起初是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。
翊君
2022/03/08
1.1K0
spring boot整合kafka
最近项目需求用到了kafka信息中间件,在此做一次简单的记录,方便以后其它项目用到。
GreizLiao
2019/09/24
6500
【kafka系列】kafka之生产者发送消息实践
进入实战之前先熟悉一下topic的相关命令,使用终端命令查询创建一个新topic,用于后期实战; 特别注意:以下命令全部依据kafka文件目录中操作; 如果尚未安装kafka,请移步《centos7系统安装kafka》
沁溪源
2022/05/06
1.1K0
【kafka系列】kafka之生产者发送消息实践
Kafka从入门到进阶
如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。
java架构师
2018/12/28
1.1K0
相关推荐
Kafka教程_图解kafka
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验