RocketMQ
依赖<!--Rocket MQ-->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
application.yml
配置rocketmq:
# NameServer地址
name-server: localhost:9876
# 生产者
producer:
# 发送同一类消息的设置为同一个group,保证唯一
group: springboot_producer_group
# 发送消息超时时间,默认3000
sendMessageTimeout: 10000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
# 消息最大长度,默认1024 * 1024 * 4(默认4M)
maxMessageSize: 4096
# 压缩消息阈值,默认4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在内部发送失败时重试另一个broker,默认false
retryNextServer: false
# 消费者
consumer:
group: springboot_consumer_group
# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
package com.example.mq.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* mq 生产者
*
* @author zibo
* @date 2023/5/17 15:48
* @slogan 慢慢学,不要停。
*/
@Slf4j
@Service
public class MQProducerService {
// 直接注入使用,用于发送消息到 broker 服务器
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public void send(String msg) {
// 写法一
rocketMQTemplate.convertAndSend("springboot_topic:test", msg);
// 写法二
// rocketMQTemplate.send("springboot_topic:test", MessageBuilder.withPayload(msg).build());
}
}
package com.example.mq.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* mq 消费者
*
* @author zibo
* @date 2023/5/17 15:48
* @slogan 慢慢学,不要停。
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "springboot_topic", selectorExpression = "test", consumerGroup = "springboot_consumer_group")
public class MQConsumerService implements RocketMQListener<String> {
// 监听到消息就会执行此方法
@Override
public void onMessage(String msg) {
log.info("监听到消息:msg={}", msg);
}
}
package com.example.mq.controller;
import com.example.mq.service.MQProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* mq 接口
*
* @author zibo
* @date 2023/5/17 15:48
* @slogan 慢慢学,不要停。
*/
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {
@Autowired
private MQProducerService mqProducerService;
@GetMapping("/send")
public void send() {
mqProducerService.send("测试消息");
}
}
RocketMQ
mqnamesrv.cmd
启动即可!mqbroker.cmd
启动即可!http://localhost:8080/rocketmq/send
控制台日志
2023-05-17 16:04:54.126 INFO 19772 --- [onsumer_group_1] c.example.mq.service.MQConsumerService : 监听到消息:msg=测试消息
rocketmq-dashboard
,并查看rocketMQTemplate.convertAndSend("springboot_topic:test", msg);
rocketMQTemplate
:是 RocketMQ 提供的用于发送消息的模板类,需要在 Spring Boot 中配置和注入。"springboot_topic:test"
:是要发送消息的目标主题和标签。在这个示例中,主题是 "springboot_topic"
,标签是 "test"
。你可以根据实际情况修改主题和标签。msg
:是要发送的消息内容。它可以是字符串、对象或其他数据类型。RocketMQTemplate
会根据消息内容的类型进行转换。/**
* 发送单向消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public void sendOneWay(String msg) {
rocketMQTemplate.sendOneWay("springboot_topic:test", msg);
}
/**
* 发送同步消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public SendResult sendSync(String msg) {
SendResult result = rocketMQTemplate.syncSend("springboot_topic:test", msg);
log.info("发送结果:{}", result);
return result;
}
/**
* 发送异步消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public void sendAsync(String msg) {
rocketMQTemplate.asyncSend("springboot_topic:test", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功");
}
@Override
public void onException(Throwable throwable) {
log.info("发送失败");
}
});
}
/**
* 发送延时消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public void sendDelay(String msg) {
rocketMQTemplate.syncSendDelayTimeSeconds("springboot_topic:test", msg, 5);
}
/**
* 发送顺序消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public void sendOrderly(String msg) {
// 第一条
rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "1");
// 第二条
rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "2");
// 第三条
rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "3");
}
略
/**
* 发送批量消息
*
* @param msgList 消息列表
*/
public void sendBatch(List<String> msgList) {
List<Message<String>> rocketMQMessages = new ArrayList<>();
for (String msg : msgList) {
rocketMQMessages.add(MessageBuilder.withPayload(msg).build());
}
rocketMQTemplate.syncSend("springboot_topic:test", rocketMQMessages);
}
SpringBoot整合RocketMQ,老鸟们都是这么玩的!
https://juejin.cn/post/7220075270664405052