前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >SpringBoot整合分布式消息平台Pulsar

SpringBoot整合分布式消息平台Pulsar

作者头像
jinjunzhu
发布于 2022-09-23 01:16:44
发布于 2022-09-23 01:16:44
74300
代码可运行
举报
文章被收录于专栏:个人开发个人开发
运行总次数:0
代码可运行

大家好,我是君哥。

作为优秀的消息流平台,Pulsar 的使用越来越多,这篇文章讲解 Pulsar 的 Java 客户端。

部署 Pulsar

Pulsar 的部署方式主要有 3 种,本地安装二进制文件、docker 部署、在 Kubernetes 上部署。

本文采用 docker 部署一个单节点的 Pulsar 集群。实验环境是 2 核 CPU 和 4G 内存。

部署命令如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone

安装过程可能会出现下面的错误:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
unknown flag: --mount
See 'docker run --help'.

这是因为 docker 版本低,不支持 mount 参数,把 docker 版本升级到 17.06 以上就可以了。

部署过程中可能会因为网络的原因失败,多试几次就可以成功了。如果看到下面的日志,就说明启动成功了。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
2022-01-08T22:27:58,726+0000 [main] INFO  org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone

本地单节点集群启动后,会创建一个 namespace,名字叫 public/default

Pulsar 客户端

目前 Pulsar 支持多种语言的客户端,包括:

Java 客户端

Go 客户端

Python 客户端

C++ 客户端

Node.js 客户端

WebSocket 客户端

C# 客户端

SpringBoot 配置

使用 SpringBoot 整合 Pulsar 客户端,首先引入 Pulsar 客户端依赖,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.9.1</version>
</dependency>

然后在 properties 文件中添加配置:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# Pulsar 地址
pulsar.url=pulsar://192.168.59.155:6650
# topic
pulsar.topic=testTopic
# consumer group
pulsar.subscription=topicGroup

创建 Client

创建客户端非常简单,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
client = PulsarClient.builder()
                .serviceUrl(url)
                .build();

上面的 url 就是 properties 文件中定义的 pulsar.url 。

创建 Client 时,即使集群没有启成功,程序也不会报错,因为这时还没有真正地去连接集群。

创建 Producer

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
producer = client.newProducer()
                .topic(topic)
                .compressionType(CompressionType.LZ4)
                .sendTimeout(0, TimeUnit.SECONDS)
                .enableBatching(true)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .batchingMaxMessages(1000)
                .maxPendingMessages(1000)
                .blockIfQueueFull(true)
                .roundRobinRouterBatchingPartitionSwitchFrequency(10)
                .batcherBuilder(BatcherBuilder.DEFAULT)
                .create();

创建 Producer,会真正的连接集群,这时如果集群有问题,就会报连接错误。

下面解释一下创建 Producer 的参数:

topic:Producer 要写入的 topic。

compressionType:压缩策略,目前支持 4 种策略 (NONE、LZ4、ZLIB、ZSTD),从 Pulsar2.3 开始,只有 Consumer 的版本在 2.3 以上,这个策略才会生效。

sendTimeout:超时时间,如果 Producer 在超时时间为收到 ACK,会进行重新发送。

enableBatching:是否开启消息批量处理,这里默认 true,这个参数只有在异步发送 (sendAsync) 时才能生效,选择同步发送会失效。

batchingMaxPublishDelay:批量发送消息的时间段,这里定义的是 10ms,需要注意的是,设置了批量时间,就不会受消息数量的影响。批量发送会把要发送的批量消息放在一个网络包里发送出去,减少网络 IO 次数,大大提高网卡的发送效率。

batchingMaxMessages:批量发送消息的最大数量。

maxPendingMessages:等待从 broker 接收 ACK 的消息队列最大长度。如果这个队列满了,producer 所有的 sendAsync 和 send 都会失败,除非设置了 blockIfQueueFull 值是 true。

blockIfQueueFull:Producer 发送消息时会把消息先放入本地 Queue 缓存,如果缓存满了,就会阻塞消息发送。

roundRobinRouterBatchingPartition-SwitchFrequency:如果发送消息时没有指定 key,那默认采用 round robin 的方式发送消息,使用 round robin 的方式,切换 partition 的周期是 (frequency * batchingMaxPublishDelay)。

创建 Consumer

Pulsar 的消费模型如下图:

从图中可以看到,Consumer 要绑定一个 subscription 才能进行消费。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
consumer = client.newConsumer()
        .topic(topic)
        .subscriptionName(subscription)
        .subscriptionType(SubscriptionType.Shared)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
        .receiverQueueSize(1000)
        .subscribe();

下面解释一下创建 Consumer 的参数:

topic:Consumer 要订阅的 topic。

subscriptionName:consumer 要关联的 subscription 名字。

subscriptionType:订阅类型,Pulsar 支持四种类型订阅:

Exclusive:独占模式,同一个 Topic 只能有一个消费者,如果多个消费者,就会出错。

Failover灾备模式,同一个 Topic 可以有多个消费者,但是只能有一个消费者消费,其他消费者作为故障转移备用,如果当前消费者出了故障,就从备用消费者中选择一个进行消费。如下图:

Shared:共享模式,同一个 Topic 可以由多个消费者订阅和消费。消息通过 round robin 轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开,如果发送给它消息没有被消费,这些消息会被重新分发给其它存活的消费者。如下图:

Key_Shared:消息和消费者都会绑定一个key,消息只会发送给绑定同一个key的消费者。如果有新消费者建立连接或者有消费者断开连接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好处是既可以让消费者并发地消费消息,又能保证同一Key下的消息顺序。如下图:

subscriptionInitialPosition:创建新的 subscription 时从哪里开始消费,有两个选项:

Latest:从最新的消息开始消费

Earliest:从最早的消息开始消费

negativeAckRedeliveryDelay:消费失败后间隔多久 broker 重新发送。

receiverQueueSize:在调用 receive 方法之前,最多能累积多少条消息。可以设置为 0,这样每次只从 broker 拉取一条消息。在 Shared 模式下,receiverQueueSize 设置为 0,可以防止批量消息多发给一个 Consumer 而导致其他 Consumer 空闲。

Consumer 接收消息有四种方式:同步单条、同步批量、异步单条和异步批量,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Message message = consumer.receive()
CompletableFuture<Message> message = consumer.receiveAsync();
Messages message = consumer.batchReceive();
CompletableFuture<Messages> message = consumer.batchReceiveAsync();

对于批量接收,也可以设置批量接收的策略,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
consumer = client.newConsumer()
    .topic(topic)
    .subscriptionName(subscription)
        .batchReceivePolicy(BatchReceivePolicy.builder()
        .maxNumMessages(100)
        .maxNumBytes(1024 * 1024)
        .timeout(200, TimeUnit.MILLISECONDS)
        .build())
    .subscribe();

代码中的参数说明如下:

maxNumMessages:批量接收的最大消息数量。

maxNumBytes:批量接收消息的大小,这里是 1MB。

测试

首先编写 Producer 发送消息的代码,如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void sendMsg(String key, String data) {
    CompletableFuture<MessageId> future = producer.newMessage()
        .key(key)
        .value(data.getBytes()).sendAsync();
    future.handle((v, ex) -> {
        if (ex == null) {
            logger.info("发送消息成功, key:{}, msg: {}", key, data);
        } else {
            logger.error("发送消息失败, key:{}, msg: {}", key, data);
        }
        return null;
    });
    future.join();
    logger.info("发送消息完成, key:{}, msg: {}", key, data);
}

然后编写一个 Consumer 消费消息的代码,如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void start() throws Exception{
    while (true) {
        Message message = consumer.receive();
        String key = message.getKey();
        String data = new String(message.getData());
        String topic = message.getTopicName();
        if (StringUtils.isNotEmpty(data)) {
            try{
                logger.info("收到消息, topic:{}, key:{}, data:{}", topic, key, data);
            }catch(Exception e){
                logger.error("接收消息异常,topic:{}, key:{}, data:{}", topic, key, data, e);
            }
        }
        consumer.acknowledge(message);
    }
}

最后编写一个 Controller 类,调用 Producer 发送消息,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@RequestMapping("/send")
@ResponseBody
public String send(@RequestParam String key, @RequestParam String data) {
    logger.info("收到消息发送请求, key:{}, value:{}", key, data);
    pulsarProducer.sendMsg(key, data);
    return "success";
}

调用 Producer 发送一条消息,key=key1,data=data1,具体操作为在浏览器中输入下面的 url 后回车:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
http://192.168.157.1:8083/pulsar/send?key=key1&data=data1

可以看到控制台输出下面日志:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
2022-01-08 22:42:33,199 [pulsar-client-io-6-1] [INFO] boot.pulsar.PulsarProducer - 发送消息成功, key:key1, msg: data1
2022-01-08 22:42:33,200 [http-nio-8083-exec-1] [INFO] boot.pulsar.PulsarProducer - 发送消息完成, key:key1, msg: data1
2022-01-08 22:42:33,232 [Thread-22] [INFO] boot.pulsar.PulsarConsumer - 收到消息, topic:persistent://public/default/testTopic, key:key1, data:data1
2022-01-08 22:43:14,498 [pulsar-timer-5-1] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2022-01-08 22:43:14,961 [pulsar-timer-9-1] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-9-0] Pending messages: 0 --- Publish throughput: 0.02 msg/s --- 0.00 Mbit/s --- Latency: med: 69.000 ms - 95pct: 69.000 ms - 99pct: 69.000 ms - 99.9pct: 69.000 ms - max: 69.000 ms --- Ack received rate: 0.02 ack/s --- Failed messages: 0

从日志中看到,这里使用的 namespace 就是创建集群时生成的public/default。

总结

从 SpringBoot 整合 Java 客户端使用来看,Pulsar 的 api 是非常友好的,使用起来方便简洁。Consumer 的使用需要考虑多一些,需要考虑到批量、异步以及订阅类型。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-01-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 君哥聊技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
[翻译]使用 Velocity 构建一个web应用
作为JSP的替代方案,Velocity 经常被用来在应用中生成web页面。使用Velocity生成页面的一些好处有:
LeoXu
2018/08/15
6410
模板引擎Velocity 基础
详细介绍大家可以看官网,传送门放这里了:The Apache Velocity Project
叫我阿杰好了
2022/11/07
5.7K0
模板引擎Velocity 基础
Velocity 语法详解「建议收藏」
Velocity是基于Java的模板引擎,它允许页面设计者引用Java中定义的方法。页面设计者和Java开发者能够同时使用MVC的模式开发网站,这样网页设计者能够把精力放在页面的设计上,程序员也可以把精力放在代码开发上。Velocity把Java代码从Web页面中分离, 使网站可维护性更强,同时也在Java服务器页面(JSPs)或者PHP中提供了可视化交互的选择。
全栈程序员站长
2022/09/14
2.2K0
java velocity 语法_Velocity语法
#set($name = “hello”)说明:velocity中变量是弱类型的。
全栈程序员站长
2022/09/14
2.6K0
Velocity语法大全
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/159008.html原文链接:https://javaforall.cn
全栈程序员站长
2022/09/14
1.1K0
Velocity模板引擎
Velocity主要分为app、context、runtime和一些辅助util几个部分。
JokerDJ
2023/11/27
6230
Velocity模板引擎
Apache Velocity-----基于Java的模板引擎
Apache Velocity是一个基于Java的模板引擎,它提供了一个模板语言去引用由Java代码定义的对象。Velocity是Apache基金会旗下的一个开源软件项目,旨在确保Web应用程序在表示层和业务逻辑层之间的隔离(即MVC设计模式)。
wuweixiang
2018/08/14
10.5K0
Velocity魔法堂系列二:VTL语法详解
一、前言                               Velocity作为历史悠久的模板引擎不单单可以替代JSP作为Java Web的服务端网页模板引擎,而且可以作为普通文本的模板引擎来增强服务端程序文本处理能力。而且Velocity被移植到不同的平台上,如.Net的NVelocity和js的Velocity.js,虽然各平台在使用和实现上略有差别,但大部分语法和引擎核心的实现是一致的,因此学习成本降低不少哦。   最好的学习资源——官网:http://velocity.apache.org
^_^肥仔John
2018/01/18
1.8K0
Velocity魔法堂系列一:入门示例
一、前言                             Velocity作为历史悠久的模板引擎不单单可以替代JSP作为Java Web的服务端网页模板引擎,而且可以作为普通文本的模板引擎来增强服务端程序文本处理能力。而且Velocity被移植到不同的平台上,如.Net的NVelocity和js的Velocity.js,虽然各平台在使用和实现上略有差别,但大部分语法和引擎核心的实现是一致的,因此学习成本降低不少哦。   最好的学习资源——官网:http://velocity.apache.org/
^_^肥仔John
2018/01/18
1.1K0
JAVA安全之Velocity模板注入刨析
关于Velocity模板注入注入之前一直缺乏一个系统性的学习和整理,搜索网上大多数类似的内容都是一些关于漏洞利用的复现,而且大多都仅限于Velocity.evaluate的执行,对于载荷的构造以及执行过程并没有详细的流程分析,于是乎只能自己动手来填坑了~
Al1ex
2024/09/26
3940
JAVA安全之Velocity模板注入刨析
Velocity魔法堂系列三:模板与宿主环境通信
一、前言                             Velocity作为历史悠久的模板引擎不单单可以替代JSP作为Java Web的服务端网页模板引擎,而且可以作为普通文本的模板引擎来增强服务端程序文本处理能力。而且Velocity被移植到不同的平台上,如.Net的 NVelocity和js的Velocity.js,虽然各平台在使用和实现上略有差别,但大部分语法和引擎核心的实现是一致的,因此学习成本降低不少 哦。   最好的学习资源——官网:http://velocity.apache.org
^_^肥仔John
2018/01/18
1.1K0
【大牛经验】Java开源web框架汇总(152款)
“框架”犹如滔滔江水连绵不绝, 知道有它就好,先掌握自己工作和主流的框架; 在研究好用和新框架。 主流框架教程分享在Java帮帮-免费资源网 其他教程需要时间制作,会陆续分享!!! 152款框架,你还知道其他的吗? 留言你用过的web框架 Java开源web框架汇总 1 Struts2 Struts2是一个web应用框架。它不是一个Struts的新的发布版本,而是一个全新的框架。Struts2 是第二代基于Model-View-Controller (MVC)模型的web应用框架。 Struts2是java
Java帮帮
2018/03/15
6.1K0
【大牛经验】Java开源web框架汇总(152款)
Maven、Webx、Velocity学习总结
1. 综述 这两周的时间,通过看指南,看代码,跑实例,对淘宝网web应用开发框架有了一个整体的初步认识,主要包括四个方面: Ø 项目管理工具Maven Ø MVC框架Webx。 Ø Webx中的View层实现——Velocity Ø Webx中的Model持久层实现——Ibatis 2. Maven部分 2.1. Maven基础 1. 坐标是指项目的唯一标识,以GAV(groupId,artifactId和version)区分, groupId包含公司和内部组(或产品线)的信息,如com.taobao.m
挑战者
2018/06/29
1.2K0
java velocity 语法_Velocity 语法
类似 name,为空时原样打印。但可以将变量和连续的字符串分隔,例如:{name}space。
全栈程序员站长
2022/09/14
3.3K0
构建现代Web应用时究竟是选择传统web应用还是SPA
在大前端盛行的今天,似乎前后端分离的开发模式才是大势所趋,而SPA的概念更是应运而生。现在随便构建一个web应用程序如果你不是使用SPA的话,就会感觉有点low,但是真的是这样吗?今天这篇文章我们就来一起探讨下,构建现代web应用时该如何进行选择。
依乐祝
2019/04/01
1.6K0
velocity定义_velocity模板
其中,if可以识别为true的内容为:非null(java的String空串,也是识别为true!)、Boolean=true
全栈程序员站长
2022/11/02
1.3K0
velocity定义_velocity模板
1.Thymeleaf 2.FreeMaker 3.Enjoy 4.Velocity 5.JSP
      1.Thymeleaf 在有网络和无网络的环境下皆可运行,即它可以让美工在浏览器查看页面的静态效果,也可以让程序员在服务器查看带数据的动态页面效果。这是由于它支持 html 原型,然后在 html 标签里增加额外的属性来达到模板+数据的展示方式。浏览器解释 html 时会忽略未定义的标签属性,所以 thymeleaf 的模板可以静态地运行;当有数据返回到页面时,Thymeleaf 标签会动态地替换掉静态内容,使页面动态显示。
zhangjiqun
2024/12/17
1300
1.Thymeleaf 2.FreeMaker 3.Enjoy 4.Velocity 5.JSP
Spring Boot开发Web应用
《SpringBoot快速入门》 中我们完成了一个简单的RESTful Service,体验了快速开发的特性。在留言中也有朋友提到如何把处理结果渲染到页面上。那么本篇就在上篇基础上介绍一下如何进行Web应用的开发。 静态资源访问 在我们开发Web应用的时候,需要引用大量的js、css、图片等静态资源。 默认配置 Spring Boot默认提供静态资源目录位置需置于classpath下,目录名需符合如下规则: /static /public /resources /META-INF/resources 举例:
程序猿DD
2018/02/01
1.1K0
[译]构建现代Web应用的安全指南
原文:Security for building modern web apps 译者:杰微刊—张迪 这篇文章的灵感来自于另一篇文章,它是关于“在今天,构建Web应用之前要知道的事情”的。并不长,但遗漏了一些关于安全性的建议,所以我就此动笔,分享一些这方面的知识。 本文重点是写给那些来自初创公司,并且想要从头开始开发一个Web应用的开发者,他们并不知道太多信息安全的知识,也不想花太多时间考虑其应用程序的安全性。一些重要的内容就不在这里讨论了,诸如威胁建模(threat modeling),持续交付安全(co
逸鹏
2018/04/09
1.2K0
Angular:构建现代Web应用的终极选择
Angular 是一款由 Google 推出的强大的前端开发框架,它具有丰富的功能和灵活的架构,被广泛应用于构建现代化的Web应用。本文将介绍Angular框架的特点、优势以及适用场景,帮助读者更好地了解并利用这一终极选择来构建出优秀的Web应用。
人不走空
2024/02/21
5160
Angular:构建现代Web应用的终极选择
相关推荐
[翻译]使用 Velocity 构建一个web应用
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档