
Apache RocketMQ 是一个分布式消息中间件,以其高吞吐量、高可用性、低延迟和高可靠性而闻名。它广泛应用于异步解耦、流量削峰、日志收集、分布式事务等场景。Spring Boot 通过其自动配置和 Starter 机制,极大地简化了与 RocketMQ 的集成过程。
本教程将带你从零开始,一步步整合 Spring Boot 与 RocketMQ,实现消息的发送与消费。
9876 端口。10911 端口。注意:生产环境需要更复杂的部署(如多节点、主从同步)和配置。
WWW.YUYOUKE.NET94丨FREE.KEZINI.CN14丨LIVE.91YOUYU.CN41丨MOBI.HONGDA0536.CN97
WWW.MZB.MOBI16丨SHARE.LSLJX.COM84丨SINA.MXJYCY.CN72丨ZUQIU.SDDXF.CN30
JRS.YIYUANJINGPIN.COM76丨WEIBO.CHANGTAI333.COM10丨LIVE.XCHW.NET39丨24K.ZYZXHM.COM31
SWEET.SANGONGZY.COM23丨SHARE.SDDANTUOJI.CN85丨SAISHI.CHINAHBI.CN49丨MOBI.10ZHANPAI.COM94
MAP.WKSKJX.CN90丨ZHIBO8.CHRONOPORTAL.CN69丨SJB.LYSCK.COM52丨JRS.ZHENGXIANGJINSHU.COM24
WAP.DGRYMY.CN27丨SAISHI.010JIU.COM73丨TV.WEISHENGMM.COM19丨SINA.SDDANTUOJI.CN52
IQIYI.WXYUNTIAN.COM21丨WWW.LOONGXIN-INFO.COM87丨CCTV.ZQIPR.COM85丨LVYIN.HZSMC.COM17
JRS.ZYZXHM.COM51丨SINA.DDHT888.COM44丨MAP.WZJGFC120.COM18丨PRETTY.HBST123.COM95
ZUQIU.FJKQW.CN95丨ZUQIU.XJQQMAIL.COM52丨JRS.CYDDW.CN96丨SHARE.LINQIANGUOJI.CN17
ZHIBO8.JSLYDQGS.COM57丨NBA.FJFYW.NET30丨ZHIBO.ZHIXINQG.COM67丨QQ.XMJIUSHI.CN74
WEIBO.0318SEO.COM97丨LVYIN.HENGHUASZ.COM47丨SAISHI.JSRQAZ.COM24丨VIP.LYGFGBJ.COM52
BOLL.CDYIBEINENGYUAN.COM95丨CCTV.XF-PV.COM19丨SINA.XJQQMAIL.COM49丨BOLL.JS-SJL.CN51
ZUQIU.RUSHANJOB.COM75丨ZHIBO8.ZKAFX120.COM75丨MAP.BXRCW.CN10丨YES.LINQIANGUOJI.CN10使用 Spring Initializr 创建项目,添加以下依赖:
Spring WebLombok (可选,用于简化代码)RocketMQ或者在 pom.xml 中手动添加:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot RocketMQ Starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version> <!-- 请检查最新版本 -->
</dependency>
<!-- Lombok (可选) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>在 application.yml (或 application.properties) 中配置 RocketMQ 服务器地址。
# application.yml
server:
port: 8080
# RocketMQ 配置
rocketmq:
name-server: localhost:9876 # NameServer 地址
producer:
group: my-producer-group # 生产者组名
consumer:
group: my-consumer-group # 消费者组名创建一个简单的 POJO 来表示消息内容。
// src/main/java/com/example/demo/model/Order.java
package com.example.demo.model;
import lombok.Data;
@Data
public class Order {
private Long id;
private String name;
private Double amount;
}RocketMQTemplateRocketMQTemplate 是 Spring Boot RocketMQ Starter 提供的核心模板类,用于发送各种类型的消息。
// src/main/java/com/example/demo/service/OrderService.java
package com.example.demo.service;
import com.example.demo.model.Order;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 同步发送消息
public void createOrder(Order order) {
// 1. 构建消息
Message<Order> message = MessageBuilder
.withPayload(order)
.setHeader("X-TYPE", "CREATE_ORDER") // 可添加自定义头
.build();
// 2. 发送同步消息
// sendInTransaction: 事务消息
// asyncSend: 异步发送
String destination = "order-topic:create"; // topic:tag
SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
System.out.printf("订单创建消息发送结果: %s%n", sendResult);
}
}// 在 OrderService 中添加方法
// 异步发送
public void sendAsyncOrder(Order order) {
rocketMQTemplate.asyncSend("order-topic:create", order, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("异步发送成功: %s%n", sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("异步发送失败: %s%n", e.getMessage());
}
});
}
// 单向发送 (只管发,不关心结果,适用于日志等)
public void sendOneWayOrder(Order order) {
rocketMQTemplate.sendOneWay("order-topic:create", order);
}
// 延迟消息 (例如,10秒后处理)
public void sendDelayedOrder(Order order) {
// 延迟等级: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
rocketMQTemplate.syncSend("order-topic:create", order, 3000, 3); // 3代表10s
}// src/main/java/com/example/demo/controller/OrderController.java
package com.example.demo.controller;
import com.example.demo.model.Order;
import com.example.demo.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping
public String createOrder(@RequestBody Order order) {
orderService.createOrder(order);
return "订单创建请求已接收";
}
}@RocketMQMessageListener 注解这是最简单的方式,Spring Boot 会自动创建消费者并监听指定的 Topic。
// src/main/java/com/example/demo/consumer/OrderConsumer.java
package com.example.demo.consumer;
import com.example.demo.model.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
@RocketMQMessageListener(
topic = "order-topic", // 监听的 Topic
consumerGroup = "my-consumer-group", // 消费者组,必须唯一
selectorExpression = "create" // Tag 过滤,只消费 tag 为 "create" 的消息
)
public class OrderConsumer implements RocketMQListener<Order> {
private static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);
@Override
public void onMessage(Order order) {
log.info("收到订单消息: {}", order);
// 在这里处理业务逻辑,例如:
// 1. 保存订单到数据库
// 2. 发送邮件/短信通知
// 3. 更新库存
// ...
System.out.println("已处理订单: " + order.getName());
}
}@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "my-consumer-group",
selectorExpression = "*",
consumeMode = ConsumeMode.CONCURRENTLY, // 并发消费 (默认)
// consumeMode = ConsumeMode.ORDERLY, // 顺序消费
messageModel = MessageModel.CLUSTERING, // 集群模式 (默认)
// messageModel = MessageModel.BROADCASTING, // 广播模式
consumeThreadMax = 64, // 最大消费线程数
consumeTimeout = 30000 // 消费超时时间 (ms)
)curl 或 Postman 发送 POST 请求:bash深色版本curl -X POST http://localhost:8080/orders \ -H "Content-Type: application/json" \ -d '{"id": 1, "name": "iPhone 15", "amount": 9999.0}'SEND_OK 的发送结果。Topic:Tag 是常见的用法。CONCURRENTLY):多个线程同时处理消息,吞吐量高,但消息处理顺序无法保证。ORDERLY):保证同一个 Queue 内的消息按发送顺序被消费。适用于订单状态流转等场景。CLUSTERING):同一 Consumer Group 内,每条消息只会被一个消费者实例消费(负载均衡)。BROADCASTING):同一 Consumer Group 内,每条消息会被所有消费者实例消费。syncSend 确保关键消息发送成功。retryTimesWhenSendFailed 和 retryTimesWhenSendAsyncFailed。X-REQ-ID)便于追踪。onMessage 中捕获所有异常,避免消费者线程崩溃。可以将处理失败的消息记录到数据库或死信队列。consumeThreadMax,避免线程过多导致系统负载过高。通过 Spring Boot 的 rocketmq-spring-boot-starter,整合 RocketMQ 变得异常简单:
name-server 地址。RocketMQTemplate 发送消息。@RocketMQMessageListener 注解定义消费者。RocketMQ 的高可靠性和 Spring Boot 的便捷性相结合,为构建高性能、高可用的分布式系统提供了强大的异步通信能力。掌握这些基础,你可以进一步探索事务消息、延迟消息、批量消息等高级特性。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。