2021-02-18. NC.
我们知道,消息在RabbitMQ的整个生命周期是生产者投递消息到Exchange,Exchange根据路由键将消息路由到合适的Queue,Queue再将消息推(或消费者主动拉)给消费者。
在这个过程当中,Exchange根据路由键将消息路由到合适的Queue的过程,可能发生诸如
从而导致消息路由失败。对于这些路由失败的消息应该如何处理呢?有两种方式:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing
# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=true
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=true
@Slf4j
@Component
public class NoMatchQueue {
/**
* 交换机名称
*/
public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE";
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void send() {
log.info("发送消息");
Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
Message message = MessageBuilder
.withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
.setContentEncoding(StandardCharsets.UTF_8.displayName())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
}
@Configuration
class ExchangeDeclare {
/**
* 只定义一个交换机,但是不绑定任何Queue,所以发送到该Exchange的消息都会路由失败
*
* @return
*/
@Bean
public Exchange noMatchQueueExchange() {
return ExchangeBuilder
.topicExchange(NoMatchQueue.EXCHANGE_NAME)
.durable(true)
.build();
}
}
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息被退回:{}", returnedMessage);
}
});
使用方式1需要我们在程序中进行编码设置回调函数监听,增加了生产者代码的复杂性,那么为了消息不丢失还有没有其他方式来处理路由失败的消息呢:答案是使用备份交换机。
fanout
。spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing
# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=false
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=false
/**
* 业务交换机
*
* @return
*/
@Bean
public Exchange noMatchQueueExchange() {
return ExchangeBuilder
.topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME)
.durable(true)
// 绑定备份交换机
.alternate(X_ALTERNATE)
.build();
}
/**
* 备份队列
*
* @return
*/
@Bean
public Queue alternateQueue() {
return QueueBuilder
.durable("Q_ALTERNATE")
.build();
}
/**
* 备份交换机
*
* @return
*/
@Bean
public Exchange alternateExchange() {
return ExchangeBuilder
.fanoutExchange(X_ALTERNATE)
.durable(true)
.build();
}
/**
* 备份绑定
*
* @param alternateExchange
* @param alternateQueue
* @return
*/
@Bean
public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) {
return BindingBuilder
.bind(alternateQueue)
.to(alternateExchange)
.with("")
.noargs();
}
/**
* 正常业务交换机
*/
public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE";
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@PostConstruct
public void send() {
log.info("发送消息");
Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
Message message = MessageBuilder
.withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
.setContentEncoding(StandardCharsets.UTF_8.displayName())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
# 在原生RabbitMQ-client中演示这一过程:
@Slf4j
public class AeTest {
/**
* 获取Channel
*/
private static final Channel CHANNEL = MqChannelUtils.getChannel();
/**
* 备份交换机
*/
private static final String X_AE = "X_AE";
/**
* 备份交换机绑定的队列
*/
private static final String Q_AE = "Q_AE";
/**
* 正常业务的交换机
*/
private static final String X_1 = "X_1";
public static void main(String[] args) throws IOException {
// 定义备份交换机-其实也是一个正常的交换机
CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true);
// 定义备份队列
CHANNEL.queueDeclare(Q_AE, true, false, false, null);
// 绑定备份
CHANNEL.queueBind(Q_AE, X_AE, "");
HashMap<String, Object> arguments = new HashMap<>();
// 绑定的备份交换机
arguments.put("alternate-exchange", X_AE);
// 定义交换机
CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments);
// 添加监听器,看看是否还会return消息
CHANNEL.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return returnMessage) {
log.error("消息被退回{}", returnMessage);
}
});
// 尝试向交换机发送消息(无法路由)- mandatory参数无效
CHANNEL.basicPublish(X_1, "", false, false,
new AMQP.BasicProperties(), "阿依古丽".getBytes(StandardCharsets.UTF_8));
}
}
X_1
和备份交换机X_AE
https://gitee.com/FutaoSmile/tech-sharing-mq
往期推荐
你可知道publisherReturns参数在spring-boot-starter-amqp中的作用?
SpringBoot如何做到自动帮我们创建RabbitMQ的Queue和Exchange的?
欢迎在评论区留下你看文章时的思考,及时说出,有助于加深记忆和理解,还能和像你一样也喜欢这个话题的读者相遇~