大家好,我是【JavaDog程序狗】
今天跟大家分享pulsar,一个分布式的消息发布/订阅传递平台。
本狗以身入局,将pulsar的使用场景,结合实际使用案例,进行大白话分析。
通过简单代码demo进行讲解,pulsar在java中如何使用?如何通过pulsar进行异步解耦?......等
狗哥最近在整理学习笔记时,偶然在百度发现pulsar相关的教程竟然大部分付费会员才能看,淦!
首先,我不反对知识付费,但是花钱才能看总感觉差那么一点意思。
于是乎,狗哥将我司日常使用的消息队列pulsar进行总结整理,让大家一次性免费学个够,不付费也能学的酣畅淋漓
Apache Pulsar 是一个高性能、可扩展且灵活的分布式消息传递和流处理平台
👽人话解释
Pulsar 就是一个消息中间件,和Kafka、RocketMQ功能差不多,多用于削峰解耦
******
👽人话解释
Pulsar 就像是一个快递中转站,但它不仅能高效地处理大量的包裹(消息),还能确保每个包裹都能准确无误地送达目的地(消费者)
******
Pulsar | ActiveMQ | RabbitMQ | RocketMQ | Kafka | |
---|---|---|---|---|---|
单机吞吐量 | 十万级 | 万级 | 万级 | 十万级 | 十万级 |
开发语言 | Java | Java | Erlang | Java | Java/Scala |
维护者 | Apache | Apache | Spring | Apache | Apache |
社区活跃度 | 高 | 低 | 高 | 高 | 高 |
消费模式 | 独占、共享、灾备、key共享 | P2P、Pub-Sub | direct、topic、Headers、fanout | 基于Topic和MessageTag | 基于Topic的Pub-Sub |
顺序消息 | 支持 | 不支持 | 不支持 | 支持 | 支持 |
稳定性 | 一般 | 好 | 好 | 一般 | 较差 |
集群支持 | 集群模式 | 主备模式 | 复制模式 | 主备模式 | 集群可扩展性强 |
关于消息队列的选取,在实际案例中取决于你的具体需求和技术背景
******
小伙伴如果不会在windows上安装docker,请查看狗哥之前文章
拉取pulsar2.9.2版本镜像
在Windows PowerShell,输入拉取pulsar镜像命令
docker pull apachepulsar/pulsar:2.9.2
启动pulsar2.9.2单机版
docker run -it -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:2.9.2 bin/pulsar standalone
使用Ctrl+P+Q组合键。当你在容器内部时,按下这个组合键可以将当前的终端会话断开,同时保持容器继续运行。
Ctrl+P+Q
******
什么是pulsar-manager?
Pulsar Manager 是 Apache Pulsar 的一个管理工具,它提供了一个用户界面和 RESTful API 用于管理和监控 Pulsar 集群。 Pulsar Manager 可以帮助管理员执行一系列集群管理任务,例如创建和删除命名空间、管理 topic、查看集群状态等。
👽人话解释
pulsar-manager就是一个pulsar的可视化工具,功能就像Navicat差不多
拉取pulsar-manager0.2.0版本镜像
在Windows PowerShell,输入拉取pulsar镜像命令
docker pull apachepulsar/pulsar-manager:v0.2.0
启动pulsar-manager v0.2.0
docker run -it -p 9527:9527 -p 7750:7750 -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties apachepulsar/pulsar-manager:v0.2.0
使用Ctrl+P+Q组合键。当你在容器内部时,按下这个组合键可以将当前的终端会话断开,同时保持容器继续运行。
Ctrl+P+Q
设置账号为admin,密码为apachepulsar
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
-H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
-H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
-H "Content-Type: application/json" \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "xxx@test.org"}'
访问 http://localhost:9527/
输入账号:admin 密码:apachepulsar
新增环境,填写环境名,及启动的pulsar单体应用地址
******
🌰场景实例
张三在电商平台买了一个产品,支付成功后,张三等着收货就好...
但是在程序业务视角来看,支付成功后,其实还有很多下游服务在默默执行。
如库存、物流、订单服务都会有相应逻辑执行。
我们就模拟真实电商案例,来讲解下如何使用pulsar以及pulsar能解决什么痛点问题。
张三支付成功后,更新订单,更新库存,更新物流...串行操作,每个服务都耗时2秒
这种串行的执行方式有很大的问题,如果整个链路串行执行,那么响应的时间就是每个业务执行时间想加,更新订单(2秒),更新库存(2秒),更新物流(2秒),总共耗时6秒。
如果还有其他下游业务,链路时间会一直叠加,造成张三用户访问等待时长,并且如果链路中有失败,则会导致整个链路异常
张三支付成功后,更新订单,更新库存,更新物流...并行操作
这种使用异步解耦方式,每个服务都异步执行,响应立刻返回,用户体验绝佳
******
我们将上面六中的两种情况进行代码实操,串行执行和使用pulsar异步解耦
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
pulsar:
#支付topic
pay-topic: persistent://public/default/pay-topic
#支付subscription
pay-subscription: pay-subscription
#订单topic
order-topic: persistent://public/default/order-topic
#订单subscription
order-subscription: order-subscription
#库存topic
stock-topic: persistent://public/default/stock-topic
#库存subscription
stock-subscription: stock-subscription
#物流topic
logistics-topic: persistent://public/default/logistics-topic
#物流subscription
logistics-subscription: logistics-subscription
# Pulsar配置
pulsar:
url: pulsar://192.168.31.27:6650
package net.javadog.pulsar.pay.service.pulsar.impl;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import net.javadog.pulsar.logistics.service.pulsar.LogisticsPulsarProducer;
import net.javadog.pulsar.order.service.pulsar.OrderPulsarProducer;
import net.javadog.pulsar.stock.service.pulsar.StockPulsarProducer;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
/**
* 支付异步通知-消费者
*
* @author hdx
* @version 1.0
* @since 2024.07
*/
@Slf4j
@Component
public class PayPulsarConsumer {
@Value("${pulsar.url}")
private String url;
@Value("${pulsar.pay-topic}")
private String topic;
@Value("${pulsar.pay-subscription}")
private String subscription;
private PulsarClient client = null;
private Consumer<byte[]> consumer = null;
private OrderPulsarProducer orderPulsarProducer;
private StockPulsarProducer stockPulsarProducer;
private LogisticsPulsarProducer logisticsPulsarProducer;
@Resource
public void setStockPulsarProducer(StockPulsarProducer stockPulsarProducer) {
this.stockPulsarProducer = stockPulsarProducer;
}
@Resource
public void setOrderPulsarProducer(OrderPulsarProducer orderPulsarProducer) {
this.orderPulsarProducer = orderPulsarProducer;
}
@Resource
public void setLogisticsPulsarProducer(LogisticsPulsarProducer logisticsPulsarProducer) {
this.logisticsPulsarProducer = logisticsPulsarProducer;
}
/**
* 使用@PostConstruct注解用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化
*/
@PostConstruct
public void initPulsar() throws Exception {
try {
//构造Pulsar client
client = PulsarClient.builder()
.serviceUrl(url)
.build();
final String ip = InetAddress.getLocalHost().getHostAddress().replaceAll("\\.", "");
//创建consumer
consumer = client.newConsumer()
.topic(topic.split(","))
.consumerName("payPulsarConsumer" + ip)
.subscriptionName(subscription)
//指定消费模式,包含:Exclusive,Failover,Shared,Key_Shared。默认Exclusive模式
.subscriptionType(SubscriptionType.Shared)
//指定从哪里开始消费还有Latest,valueof可选,默认Latest
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
//指定消费失败后延迟多久broker重新发送消息给consumer,默认60s
.negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
.batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumBytes(10*1024*1024)
.maxNumMessages(-1)
.timeout(100, TimeUnit.MILLISECONDS)
.build())
.subscribe();
} catch (Exception e) {
log.error("Pulsar初始化异常:", e);
throw e;
}
}
public void start() throws Exception {
//消费消息
log.debug("支付消费者启动");
while (true) {
Message<byte[]> message = consumer.receive();
log.debug("消费消息中,message:{}", message);
final String id = new String(message.getValue());
if (ObjectUtil.isNotNull(id)) {
try {
messageHandle(id);
consumer.acknowledge(message);
} catch (Exception e) {
log.error("消费Pulsar支付数据异常,key【{}】,orderId【{}】:", message.getKey(), id, e);
}
}
}
}
private void messageHandle(String orderId) {
log.info("【支付消费】,orderId:{}", orderId);
// 订单生产消息
orderPulsarProducer.handler(orderId, 0);
// 库存生产消息
stockPulsarProducer.handler(orderId, 0);
// 物流生产消息
logisticsPulsarProducer.handler(orderId, 0);
}
public void close() {
try {
consumer.close();
} catch (PulsarClientException e) {
log.error("关闭Pulsar消费者失败:", e);
}
try {
client.close();
} catch (PulsarClientException e) {
log.error("关闭Pulsar连接失败:", e);
}
}
}
在pulsar-boot-main下找到Application并运行。注意:pulsar一定先起来!!
在控制台打开打印的swagger地址,进行接口调试
随意录入orderId,点击支付-普通串行支付-方式A接口【Execute】按钮,查看结果
随意录入orderId,点击采用消息队列解耦-方式B接口【Execute】按钮,查看结果
demo代码逻辑简单,业务逻辑清晰,以最直观的响应时间来展示使用pulsar异步解耦优势,并使用优雅的分层使代码结构干净整洁。
希望大家能够下载demo实操一下,好记性不如烂笔头;彻底掌握使用pulsar的小技巧,将其运用到实战中,真正体现它的优点。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。