首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Spring Boot 进阶:Spring Boot 整合 RocketMQ,轻松实现异步通信

Spring Boot 进阶:Spring Boot 整合 RocketMQ,轻松实现异步通信

原创
作者头像
技术文章分析
发布2025-09-21 15:17:53
发布2025-09-21 15:17:53
3120
举报
文章被收录于专栏:技术技术

Apache RocketMQ 是一个分布式消息中间件,以其高吞吐量、高可用性、低延迟和高可靠性而闻名。它广泛应用于异步解耦、流量削峰、日志收集、分布式事务等场景。Spring Boot 通过其自动配置和 Starter 机制,极大地简化了与 RocketMQ 的集成过程。

本教程将带你从零开始,一步步整合 Spring Boot 与 RocketMQ,实现消息的发送与消费。


一、环境准备

1.1 安装 RocketMQ

  1. 下载: 下载最新稳定版(如 4.9.4)。
  2. 启动 NameServer:bash深色版本# Linux/Mac nohup sh bin/mqnamesrv & # Windows start mqnamesrv.cmd 默认监听 9876 端口。
  3. 启动 Broker:bash深色版本# Linux/Mac nohup sh bin/mqbroker -n localhost:9876 & # Windows start mqbroker.cmd -n localhost:9876 默认监听 10911 端口。

注意:生产环境需要更复杂的部署(如多节点、主从同步)和配置。

代码语言:javascript
复制
WWW.YUYOUKE.NET94丨FREE.KEZINI.CN14丨LIVE.91YOUYU.CN41丨MOBI.HONGDA0536.CN97
WWW.MZB.MOBI16丨SHARE.LSLJX.COM84丨SINA.MXJYCY.CN72丨ZUQIU.SDDXF.CN30
JRS.YIYUANJINGPIN.COM76丨WEIBO.CHANGTAI333.COM10丨LIVE.XCHW.NET39丨24K.ZYZXHM.COM31
SWEET.SANGONGZY.COM23丨SHARE.SDDANTUOJI.CN85丨SAISHI.CHINAHBI.CN49丨MOBI.10ZHANPAI.COM94
MAP.WKSKJX.CN90丨ZHIBO8.CHRONOPORTAL.CN69丨SJB.LYSCK.COM52丨JRS.ZHENGXIANGJINSHU.COM24
WAP.DGRYMY.CN27丨SAISHI.010JIU.COM73丨TV.WEISHENGMM.COM19丨SINA.SDDANTUOJI.CN52
IQIYI.WXYUNTIAN.COM21丨WWW.LOONGXIN-INFO.COM87丨CCTV.ZQIPR.COM85丨LVYIN.HZSMC.COM17
JRS.ZYZXHM.COM51丨SINA.DDHT888.COM44丨MAP.WZJGFC120.COM18丨PRETTY.HBST123.COM95
ZUQIU.FJKQW.CN95丨ZUQIU.XJQQMAIL.COM52丨JRS.CYDDW.CN96丨SHARE.LINQIANGUOJI.CN17
ZHIBO8.JSLYDQGS.COM57丨NBA.FJFYW.NET30丨ZHIBO.ZHIXINQG.COM67丨QQ.XMJIUSHI.CN74
WEIBO.0318SEO.COM97丨LVYIN.HENGHUASZ.COM47丨SAISHI.JSRQAZ.COM24丨VIP.LYGFGBJ.COM52
BOLL.CDYIBEINENGYUAN.COM95丨CCTV.XF-PV.COM19丨SINA.XJQQMAIL.COM49丨BOLL.JS-SJL.CN51
ZUQIU.RUSHANJOB.COM75丨ZHIBO8.ZKAFX120.COM75丨MAP.BXRCW.CN10丨YES.LINQIANGUOJI.CN10

1.2 创建 Spring Boot 项目

使用 Spring Initializr 创建项目,添加以下依赖:

  • Spring Web
  • Lombok (可选,用于简化代码)
  • RocketMQ

或者在 pom.xml 中手动添加:

代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Spring Boot RocketMQ Starter -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version> <!-- 请检查最新版本 -->
    </dependency>

    <!-- Lombok (可选) -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

二、配置 RocketMQ

application.yml (或 application.properties) 中配置 RocketMQ 服务器地址。

代码语言:javascript
复制
# application.yml
server:
  port: 8080

# RocketMQ 配置
rocketmq:
  name-server: localhost:9876 # NameServer 地址
  producer:
    group: my-producer-group    # 生产者组名
  consumer:
    group: my-consumer-group    # 消费者组名

三、定义消息实体

创建一个简单的 POJO 来表示消息内容。

代码语言:javascript
复制
// src/main/java/com/example/demo/model/Order.java
package com.example.demo.model;

import lombok.Data;

@Data
public class Order {
    private Long id;
    private String name;
    private Double amount;
}

四、发送消息 (Producer)

4.1 使用 RocketMQTemplate

RocketMQTemplate 是 Spring Boot RocketMQ Starter 提供的核心模板类,用于发送各种类型的消息。

代码语言:javascript
复制
// src/main/java/com/example/demo/service/OrderService.java
package com.example.demo.service;

import com.example.demo.model.Order;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 同步发送消息
    public void createOrder(Order order) {
        // 1. 构建消息
        Message<Order> message = MessageBuilder
                .withPayload(order)
                .setHeader("X-TYPE", "CREATE_ORDER") // 可添加自定义头
                .build();

        // 2. 发送同步消息
        // sendInTransaction: 事务消息
        // asyncSend: 异步发送
        String destination = "order-topic:create"; // topic:tag
        SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
        System.out.printf("订单创建消息发送结果: %s%n", sendResult);
    }
}

4.2 发送不同类型的消息

代码语言:javascript
复制
// 在 OrderService 中添加方法

// 异步发送
public void sendAsyncOrder(Order order) {
    rocketMQTemplate.asyncSend("order-topic:create", order, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.printf("异步发送成功: %s%n", sendResult);
        }

        @Override
        public void onException(Throwable e) {
            System.out.printf("异步发送失败: %s%n", e.getMessage());
        }
    });
}

// 单向发送 (只管发,不关心结果,适用于日志等)
public void sendOneWayOrder(Order order) {
    rocketMQTemplate.sendOneWay("order-topic:create", order);
}

// 延迟消息 (例如,10秒后处理)
public void sendDelayedOrder(Order order) {
    // 延迟等级: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    rocketMQTemplate.syncSend("order-topic:create", order, 3000, 3); // 3代表10s
}

4.3 消息发送端控制器

代码语言:javascript
复制
// src/main/java/com/example/demo/controller/OrderController.java
package com.example.demo.controller;

import com.example.demo.model.Order;
import com.example.demo.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/orders")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping
    public String createOrder(@RequestBody Order order) {
        orderService.createOrder(order);
        return "订单创建请求已接收";
    }
}

五、消费消息 (Consumer)

5.1 使用 @RocketMQMessageListener 注解

这是最简单的方式,Spring Boot 会自动创建消费者并监听指定的 Topic。

代码语言:javascript
复制
// src/main/java/com/example/demo/consumer/OrderConsumer.java
package com.example.demo.consumer;

import com.example.demo.model.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@RocketMQMessageListener(
    topic = "order-topic",           // 监听的 Topic
    consumerGroup = "my-consumer-group", // 消费者组,必须唯一
    selectorExpression = "create"    // Tag 过滤,只消费 tag 为 "create" 的消息
)
public class OrderConsumer implements RocketMQListener<Order> {

    private static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    @Override
    public void onMessage(Order order) {
        log.info("收到订单消息: {}", order);
        // 在这里处理业务逻辑,例如:
        // 1. 保存订单到数据库
        // 2. 发送邮件/短信通知
        // 3. 更新库存
        // ...
        System.out.println("已处理订单: " + order.getName());
    }
}

5.2 高级消费者配置

代码语言:javascript
复制
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "my-consumer-group",
    selectorExpression = "*",
    consumeMode = ConsumeMode.CONCURRENTLY, // 并发消费 (默认)
    // consumeMode = ConsumeMode.ORDERLY,   // 顺序消费
    messageModel = MessageModel.CLUSTERING, // 集群模式 (默认)
    // messageModel = MessageModel.BROADCASTING, // 广播模式
    consumeThreadMax = 64, // 最大消费线程数
    consumeTimeout = 30000 // 消费超时时间 (ms)
)

六、测试

  1. 启动 Spring Boot 应用。
  2. 使用 curl 或 Postman 发送 POST 请求:bash深色版本curl -X POST http://localhost:8080/orders \ -H "Content-Type: application/json" \ -d '{"id": 1, "name": "iPhone 15", "amount": 9999.0}'
  3. 观察控制台输出:
    • 生产者端应打印类似 SEND_OK 的发送结果。
    • 消费者端应打印收到的订单信息。

七、关键概念与最佳实践

7.1 核心概念

  • Producer (生产者):发送消息的应用程序。
  • Consumer (消费者):接收并处理消息的应用程序。
  • Topic (主题):消息的逻辑分类。一个 Topic 可以被多个 Consumer Group 订阅。
  • Tag (标签):对同一 Topic 下的消息进行更细粒度的划分。Topic:Tag 是常见的用法。
  • NameServer:路由管理服务,Producer 和 Consumer 通过它获取 Broker 的路由信息。
  • Broker:消息中转角色,负责存储消息、转发消息。
  • Consumer Group:一组消费者的集合。同一 Group 内的消费者共同消费一个 Topic 的消息(负载均衡)。不同 Group 之间是广播关系。

7.2 消费模式

  • 并发消费 (CONCURRENTLY):多个线程同时处理消息,吞吐量高,但消息处理顺序无法保证。
  • 顺序消费 (ORDERLY):保证同一个 Queue 内的消息按发送顺序被消费。适用于订单状态流转等场景。

7.3 消息模型

  • 集群模式 (CLUSTERING):同一 Consumer Group 内,每条消息只会被一个消费者实例消费(负载均衡)。
  • 广播模式 (BROADCASTING):同一 Consumer Group 内,每条消息会被所有消费者实例消费。

7.3 最佳实践

  1. 生产者
    • 使用 syncSend 确保关键消息发送成功。
    • 设置合理的 retryTimesWhenSendFailedretryTimesWhenSendAsyncFailed
    • 为消息添加必要的业务标识(如 X-REQ-ID)便于追踪。
  2. 消费者
    • 幂等性:确保消费者处理消息是幂等的,因为 RocketMQ 保证的是至少一次投递。
    • 异常处理:在 onMessage 中捕获所有异常,避免消费者线程崩溃。可以将处理失败的消息记录到数据库或死信队列。
    • 性能:合理设置 consumeThreadMax,避免线程过多导致系统负载过高。
  3. 监控:使用 RocketMQ Dashboard 或集成 Prometheus/Grafana 监控消息堆积、发送/消费延迟等指标。

通过 Spring Boot 的 rocketmq-spring-boot-starter,整合 RocketMQ 变得异常简单:

  1. 添加 Starter 依赖。
  2. 配置 name-server 地址。
  3. 使用 RocketMQTemplate 发送消息。
  4. 使用 @RocketMQMessageListener 注解定义消费者。

RocketMQ 的高可靠性和 Spring Boot 的便捷性相结合,为构建高性能、高可用的分布式系统提供了强大的异步通信能力。掌握这些基础,你可以进一步探索事务消息、延迟消息、批量消息等高级特性。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、环境准备
    • 1.1 安装 RocketMQ
    • 1.2 创建 Spring Boot 项目
  • 二、配置 RocketMQ
  • 三、定义消息实体
  • 四、发送消息 (Producer)
    • 4.1 使用 RocketMQTemplate
    • 4.2 发送不同类型的消息
    • 4.3 消息发送端控制器
  • 五、消费消息 (Consumer)
    • 5.1 使用 @RocketMQMessageListener 注解
    • 5.2 高级消费者配置
  • 六、测试
  • 七、关键概念与最佳实践
    • 7.1 核心概念
    • 7.2 消费模式
    • 7.3 消息模型
    • 7.3 最佳实践
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档