前 言 🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端 ☕专栏简介:深入、全面、系统的介绍消息中间件 🌰 文章简介:本文将介绍springboot整合rabbitmq,消息可靠性的保证和死信队列等知识
新建项目rabbitmqdemo02,新建模块producer-springboot
修改改模块的pom.xml,引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
配置下application.yml
spring:
rabbitmq:
host: localhost
username: guest
password: guest
virtual-host: /
port: 5672
启动类com.wangzhou.ProducerApplication。
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
按如下结构新建配置类RabbitMQConfig
编写配置类。
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "boot_queue";
public static final String EXCHANGE_NAME = "boot_exchange";
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue")Queue queue, @Qualifier("bootExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
编写测试类。
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
// 1.注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.haha", "boot mq haha~~~~");
}
}
运行,rabbitmq管控台就有我们创建的队列了。
点进去还可以看到具体的消息详情。
步骤与生产者极其类似。
创建工程consumer-springboot
编写pom.xml。引入依赖。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
编写yml配置
spring:
rabbitmq:
host: localhost
username: guest
password: guest
virtual-host: /
port: 5672
新建主启动类。
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
新建监听类。
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenQueue(Message message) {
System.out.println(message);
}
}
启动类运行。完美!
小结下。
上面问题的答案是:发送时丢失(未到交换机或者到交换机未到队列),MQ丢失,消费者丢失。
针对这些可能性,我们将介绍如下高级特性。
基于这些问题,我们需要进一步学习MQ的一些高级特性。
新建一个工程,mq-advanced-demo。项目的架构如下图。
完整代码。
logging:
pattern:
dateformat: HH:mm:ss:SSS
level:
cn.itcast: debug
spring:
rabbitmq:
host: localhost # rabbitMQ的ip地址
port: 5672 # 端口
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
当消息到交换器,但是路由过程中出现问题,通过ReturnCallback回调。
每个RabbitTemplate只能配置一个ReturnCallback,而RabbitTemplate是由spring容器创建的,是单例实例。因此ReturnCallback必须在全局进行配置,即在项目启动过程进行配置。
因此,CommonConfig实现了ApplicationContextAware接口。我们知道,Aware是通知接口,而ApplicationContext是一个bean容器,管理spring项目中的bean。因此,实现了ApplicationContextAware接口即意味着可以在项目启动所有bean(当然包括rabbitTemplate)加载以后调用回调,获取rabbitTemplate,设置全局的ReturnCallback。具体细节可以看接口的方法实现setApplicationContext。
代码如下。
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 记录日志
log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要的话,重发消息
});
}
}
当消息甚至还没有到达交换机,通过ConfirmCallback来执行回调策略。这时不需要全局唯一的ConfrimCallback回调,可以每次发消息时指定不同的ConfirmCallback回调。因此代码放到单元测试类中即可。
代码如下。
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.准备消息
String message = "hello, spring amqp!";
// 2.准备CorrelationData
// 2.1.消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.准备ConfirmCallback
correlationData.getFuture().addCallback(result -> {
// 判断结果
if (result.isAck()) {
// ACK
log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
} else {
// NACK
log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
// 重发消息
}
}, ex -> {
// 记录日志
log.error("消息发送失败!", ex);
// 重发消息
});
// 3.发送消息
rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
}
}
上面代码中的交换机和queue及二者的绑定,可以手动的在管控台创建,配置(如果有不需要)。
跑以下这段测试代码。结果如下。
在管控台可以看到消息ready数为1。
下面演示下消息根本没有到达交换机,没有返回值的失败情况。将代码中交互机修改成一个不存在的,如aamp
在创建队列与交换机时可以设置是否持久化,这样不会因为宕机而丢失消息。在管控台上傻瓜式,选择Durable即可。
在代码里实现也特别简单,指定参数即可。我们这里不演示了,直接把代码截图贴给大家。
另外,在spring中队列、交换机和消息默认情况下其实都是持久的哦。
经过生产者消息确认机制和消息持久化,消息一定可以投递到消费者,但是是否消息一定可以被消费还不一定,如果投递时,消费者死了。那就GG了。
因此还需要消费者消息确认机制。
先将配置中acknowledge-mode
设置成none测试下。
logging:
pattern:
dateformat: HH:mm:ss:SSS
level:
cn.itcast: debug
spring:
rabbitmq:
host: localhost # rabbitMQ的ip地址
port: 5672 # 端口
username: guest
password: guest
virtual-host: /
listener:
simple:
prefetch: 1
acknowledge-mode: none
编写下监听类,在代码中模拟下处理过程出现异常的情形。
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1 / 0);
log.info("消费者处理消息成功!");
}
}
如下图,打一个断点。
在管控台确定下队列的情况,可以看到现在有一条消息。
debug消费者。
执行到断点处,查看管控台,队列已经没有消息了。
这说明消息已经投递到消费者进行消费了。接着走,消费者就出异常了,消息丢失。
接着来,将 acknowledge-mode:设置为auto。使用生产者发送一条新消息,再用消费者debug。
管控台如下所示,发现unacked字段是1,说明此时消息已经被消费者获取,但是还没有返回值ack。
如果放开断点直接跑,消费者会一直刷新获取消息。消息会一直重新尝试投递。
这样的方式比直接丢消息要好一点,但是捏,也不完美,如果消费者代码本身没有问题,消费者会最终将消息消费,如果代码本身有问题,就一直跑着。后面会学习更加升级的做法。
上述问题,可以设置重试的上限。设置很简单,在消费者的配置文件里配配就好。
读者请自测。
上面的策略有一个问题,重试多次以后消息就丢了,普通消息无所谓,重要消息那就难受了。
实际上,可以指定消费者失败消息处理策略。
第三种策略显然是最完整的,生产中很推荐。其具体做法参考下图。
做一下,编写ErrorMessageConfig
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorMessageBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
测试下,重新跑下消费者。
在管控台可以检测下queue,exchange及其绑定关系。
直接通过管控台给simple.queue发送下消息。
效果是这样的。
在管控台的error.queue中可以看到消息,甚至可以看到具体的异常栈信息!牛啊!
总结下。
举一个栗子,订单超时未支付则自动取消。
下面用代码实现下第一种方式吧。
生产者模块新增配置类TTLRabbitConfiguration.
@Configuration
public class TTLRabbitConfiguration {
public static final String QUEUE_NAME = "ttl_queue_test";
public static final String EXCHANGE_NAME = "ttl_exchange_test";
@Bean("ttlExchange")
public Exchange ttlExchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean("ttlQueue")
public Queue ttlQueue() {
Map<String, Object> args = new HashMap<>();
// 设置5s的过期时间
args.put("x-message-ttl", 5000);
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
}
@Bean
public Binding bindTTLQueueExchange(@Qualifier("ttlQueue")Queue queue, @Qualifier("ttlExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
}
测试类新增方法。
@Test
public void testTTLSend() {
rabbitTemplate.convertAndSend(TTLRabbitConfiguration.EXCHANGE_NAME, "ttl", "ttl mq haha~~~~");
}
启动运行,可以看到下图中ttl_queue_test会有标识TTL,有1条消息ready。
5s以后,ready的消息数会变成0条。
配置类
@Configuration
public class MSGTTLRabbitConfig {
public static final String QUEUE_NAME = "msg_queue";
public static final String EXCHANGE_NAME = "msg_exchange";
@Bean("msgExchange")
public Exchange msgExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean("msgQueue")
public Queue msgQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding bindMSGQueueExchange(@Qualifier("msgQueue")Queue queue, @Qualifier("msgExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("msg_ttl").noargs();
}
}
测试方法。
@Test
public void testMsgTTLSend() {
MessagePostProcessor postProcessor= new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(MSGTTLRabbitConfig.EXCHANGE_NAME, "msg_ttl", "msg_ttl mq haha~~~~",postProcessor);
}
读者可以自测。
如果ttl到达,直接将消息删除,消息永久就消失了。实际上业务往往不会真的删除,而是将过期队列中过期的消息移入死信交换机。
注意与前面所学的消息失败的异常交换机进行对比。可以发现,异常消息是消费者将其投递到异常队列,而死信消费者可不会管事哦。
死信交换机当然也可以做异常兜底,但是他还有其它的应用场景。建议异常兜底方案还是使用异常交换机来搞。
由于死信消息会直接由普通队列投递到死信队列,而不是通过consumer,因此,需要在投递时指定死信交换机和对应的路由key。
总结下。
手工去实现延迟队列多少有点繁琐,可以使用官方插件来快速做。
下面来安装下延迟队列插件。
官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
下面我们会讲解基于Docker来安装RabbitMQ插件,如果您是通过其它方式安装的RabbitMQ,可以选择使用docker再装下或者自己查找对应的插件安装方式。
RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html
其中包含各种各样的插件,包括我们要使用的DelayExchange插件:
大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9这个对应RabbitMQ的3.8.5以上版本。
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
我们之前设定的RabbitMQ的数据卷名称为mq-plugins
,所以我们使用下面命令查看数据卷:
docker volume inspect mq-plugins
可以得到下面结果:
接下来,将插件上传到这个目录即可:
最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq
,所以执行下面命令:
docker exec -it mq bash
执行时,请将其中的 -it
后面的mq
替换为你自己的容器名.
进入容器内部后,执行下面命令开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
结果如下:
在管控台。
或者也可以在代码中做上面同样的工作。
声明下死信交换机。
代码贴下。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String msg) {
log.info("消费者接收到了delay.queue的延迟消息");
}
发消息。
代码如下。
@Test
public void testSendDelayMessage() throws InterruptedException {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay", 5000)
.build();
// 2.准备CorrelationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.发送消息
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.info("发送消息成功");
}
跑一下,会发现一个问题
实际上消息只是延迟了,但是异常队列处理了它。因此我们需要对之前的异常策略进行下增强。将生产者的config进行下增强,判断下是否是延迟消息。
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 判断是否是延迟消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
// 是一个延迟消息,忽略这个错误提示
return;
}
// 记录日志
log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要的话,重发消息
});
}
}
总结下。