前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >【RocketMQ】004-Spring Boot 集成 RocketMQ

【RocketMQ】004-Spring Boot 集成 RocketMQ

作者头像
訾博ZiBo
发布2025-01-06 19:05:05
发布2025-01-06 19:05:05
25600
代码可运行
举报
运行总次数:0
代码可运行

【RocketMQ】004-Spring Boot 集成 RocketMQ

一、基本使用

1、创建 Spring Boot 项目,并引入 RocketMQ 依赖

代码语言:javascript
代码运行次数:0
复制
<!--Rocket MQ-->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2、application.yml 配置

代码语言:javascript
代码运行次数:0
复制
rocketmq:
  # NameServer地址
  name-server: localhost:9876
  # 生产者
  producer:
    # 发送同一类消息的设置为同一个group,保证唯一
    group: springboot_producer_group
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 10000
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false
  # 消费者
  consumer:
    group: springboot_consumer_group
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
    pull-batch-size: 10

3、消息生产者

代码语言:javascript
代码运行次数:0
复制
package com.example.mq.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * mq 生产者
 *
 * @author zibo
 * @date 2023/5/17 15:48
 * @slogan 慢慢学,不要停。
 */
@Slf4j
@Service
public class MQProducerService {

    // 直接注入使用,用于发送消息到 broker 服务器
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送普通消息
     *
     * @param msg 消息可以是任何对象,如:String、Map、对象等
     */
    public void send(String msg) {
        // 写法一
        rocketMQTemplate.convertAndSend("springboot_topic:test", msg);
        // 写法二
        // rocketMQTemplate.send("springboot_topic:test", MessageBuilder.withPayload(msg).build());
    }
}

4、消息消费者

代码语言:javascript
代码运行次数:0
复制
package com.example.mq.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * mq 消费者
 *
 * @author zibo
 * @date 2023/5/17 15:48
 * @slogan 慢慢学,不要停。
 */
@Slf4j
@Service
@RocketMQMessageListener(topic = "springboot_topic", selectorExpression = "test", consumerGroup = "springboot_consumer_group")
public class MQConsumerService implements RocketMQListener<String> {

    // 监听到消息就会执行此方法
    @Override
    public void onMessage(String msg) {
        log.info("监听到消息:msg={}", msg);
    }
}

5、消息调用接口

代码语言:javascript
代码运行次数:0
复制
package com.example.mq.controller;

import com.example.mq.service.MQProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * mq 接口
 *
 * @author zibo
 * @date 2023/5/17 15:48
 * @slogan 慢慢学,不要停。
 */
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {

    @Autowired
    private MQProducerService mqProducerService;

    @GetMapping("/send")
    public void send() {
        mqProducerService.send("测试消息");
    }

}

6、启动 RocketMQ

  • 启动 NameServer :双击 mqnamesrv.cmd 启动即可!
  • 启动 Broker :双击 mqbroker.cmd 启动即可!

7、启动项目,并访问

http://localhost:8080/rocketmq/send

控制台日志

代码语言:javascript
代码运行次数:0
复制
2023-05-17 16:04:54.126  INFO 19772 --- [onsumer_group_1] c.example.mq.service.MQConsumerService   : 监听到消息:msg=测试消息

8、启动 rocketmq-dashboard ,并查看

主题
image-20230517160840104
image-20230517160840104
消费者
image-20230517161012491
image-20230517161012491
消息
image-20230517161106989
image-20230517161106989

二、常用消息种类

1、常用消息种类

  1. 普通消息(Normal Message):普通消息是最常用的消息类型,一旦发送就会立即被投递给消费者进行消费
  2. 延时消息(Delay Message):延时消息是指发送后延迟一段时间后再投递给消费者。你可以指定延时级别,例如延迟 10 秒、1 分钟、1 小时等。
  3. 顺序消息(Orderly Message):顺序消息是指保证消息按照发送的顺序被消费的消息类型。你可以为同一消息队列中的消息指定相同的消息队列选择器(Message Queue Selector),从而保证消息按照发送顺序被消费。
  4. 事务消息(Transaction Message):事务消息是指将消息发送与本地事务操作相结合,可以保证消息和本地事务的最终一致性。发送事务消息时,你需要实现事务监听器(Transaction Listener)来执行本地事务和提交事务状态。
  5. 批量消息(Batch Message):批量消息是一次发送多条消息的方式,可以减少网络开销和提高消息吞吐量。你可以将多个消息封装成一个消息列表,然后使用批量发送方法一次性发送。

2、普通消息

代码示例
代码语言:javascript
代码运行次数:0
复制
rocketMQTemplate.convertAndSend("springboot_topic:test", msg);
代码解释
  • rocketMQTemplate:是 RocketMQ 提供的用于发送消息的模板类,需要在 Spring Boot 中配置和注入。
  • convertAndSend方法:是 RocketMQTemplate 类的方法,用于将消息对象转换并发送消息。它接受两个参数:
    • "springboot_topic:test":是要发送消息的目标主题和标签。在这个示例中,主题"springboot_topic"标签"test"。你可以根据实际情况修改主题和标签。
    • msg:是要发送的消息内容。它可以是字符串、对象或其他数据类型。RocketMQTemplate 会根据消息内容的类型进行转换。
单向消息
代码语言:javascript
代码运行次数:0
复制
/**
 * 发送单向消息
 *
 * @param msg 消息可以是任何对象,如:String、Map、对象等
 */
public void sendOneWay(String msg) {
    rocketMQTemplate.sendOneWay("springboot_topic:test", msg);
}
同步消息
代码语言:javascript
代码运行次数:0
复制
/**
 * 发送同步消息
 *
 * @param msg 消息可以是任何对象,如:String、Map、对象等
 */
public SendResult sendSync(String msg) {
    SendResult result = rocketMQTemplate.syncSend("springboot_topic:test", msg);
    log.info("发送结果:{}", result);
    return result;
}
异步消息
代码语言:javascript
代码运行次数:0
复制
/**
 * 发送异步消息
 *
 * @param msg 消息可以是任何对象,如:String、Map、对象等
 */
public void sendAsync(String msg) {
    rocketMQTemplate.asyncSend("springboot_topic:test", msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("发送成功");
        }

        @Override
        public void onException(Throwable throwable) {
            log.info("发送失败");
        }
    });
}

3、延时消息

代码语言:javascript
代码运行次数:0
复制
/**
 * 发送延时消息
 *
 * @param msg 消息可以是任何对象,如:String、Map、对象等
 */
public void sendDelay(String msg) {
    rocketMQTemplate.syncSendDelayTimeSeconds("springboot_topic:test", msg, 5);
}

4、顺序消息

代码语言:javascript
代码运行次数:0
复制
/**
 * 发送顺序消息
 *
 * @param msg 消息可以是任何对象,如:String、Map、对象等
 */
public void sendOrderly(String msg) {
    // 第一条
    rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "1");
    // 第二条
    rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "2");
    // 第三条
    rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "3");
}

5、事务消息

6、批量消息

代码语言:javascript
代码运行次数:0
复制
/**
 * 发送批量消息
 *
 * @param msgList 消息列表
 */
public void sendBatch(List<String> msgList) {
    List<Message<String>> rocketMQMessages = new ArrayList<>();
    for (String msg : msgList) {
        rocketMQMessages.add(MessageBuilder.withPayload(msg).build());
    }
    rocketMQTemplate.syncSend("springboot_topic:test", rocketMQMessages);
}

三、参考文章

SpringBoot整合RocketMQ,老鸟们都是这么玩的!

https://juejin.cn/post/7220075270664405052

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-01-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 【RocketMQ】004-Spring Boot 集成 RocketMQ
  • 一、基本使用
    • 1、创建 Spring Boot 项目,并引入 RocketMQ 依赖
    • 2、application.yml 配置
    • 3、消息生产者
    • 4、消息消费者
    • 5、消息调用接口
    • 6、启动 RocketMQ
    • 7、启动项目,并访问
    • 8、启动 rocketmq-dashboard ,并查看
      • 主题
      • 消费者
      • 消息
  • 二、常用消息种类
    • 1、常用消息种类
    • 2、普通消息
      • 代码示例
      • 代码解释
      • 单向消息
      • 同步消息
      • 异步消息
    • 3、延时消息
    • 4、顺序消息
    • 5、事务消息
    • 6、批量消息
  • 三、参考文章
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档