RabbitMQ
是基于 AMQP
协议实现的,该协议实现了事务机制,因此 RabbitMQ
也支持事务机制。Spring AMQP
也提供了对事务相关的操作,RabbitMQ
事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败
@Configuration
public class TransactionConfig {
@Bean
public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
}
@Bean("transQueue")
public Queue transQueue() {
return QueueBuilder.durable("trans_queue").build();
}
@RestController
@RequestMapping("/trans")
public class TransactionProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
@RequestMapping("/send")
public String send() {
rabbitTemplate.convertAndSend("", "trans_queue", "trans test 1...");
int a = 50 / 0;
rabbitTemplate.convertAndSend("", "trans_queue", "trans test 2...");
return "发送成功";
}
}
@Transactional
,会发现消息 1 发送成功@Transactional
,消息 1 和消息 2 全部发送失败RabbitMQ
队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅列表的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可
默认情况下 RabbitMQ
是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息。这种方式是不太合理的,试想一下,如果某些消费者消费速度慢,而某些消费者速度很快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降
如何处理呢?
channel.basicQos(int prefetchCount)
方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量比如:消费端调用了 channelbasicQos(5)
,RabbitMQ
会为该消费者计数,发送一条消息消息计数 +1
,消费一条消息计数 -1
,当达到了设定的上限,RabbitMQ
就不会再向它发送消息了,直到消费者确认了某条消息
TCP/IP
中的滑动窗口消息分发的常见场景有如下:
如下使用场景:
订单系统每秒最多处理 5000
请求,正常情况下,订单系统可以正常满足需求,但是在秒杀时间点,请求瞬间增多,每秒 1万
个请求,如果这些请求全部通过 MQ
发送到订单系统,无疑会把订单系统压垮
RabbitMQ
提供了限流机制,可以控制消费端一次只拉取 N
个请求prefetchCount
参数,同时也必须要设置消息应答方式为手动应答prefetchCount
:控制消费者从队列中预取(prefetch
)消息的数量,以此来实现流量控制和负载均衡prefetch
参数,设置应答方式为手动应答listener:
simple:
acknowledge-mode: manual # 消息接收确认
prefetch: 5
// 消息分发——限流
public static final String QOS_EXCHANGE_NAME = "qos_exchange";
public static final String QOS_QUEUE = "qos_queue";
@Configuration
public class QosConfig {
// 1. 交换机
@Bean("qosExchange")
public Exchange qosExchange() {
return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE_NAME).durable(true).build();
}
// 2. 队列
@Bean("qosQueue")
public Queue qosQueue() {
return QueueBuilder.durable(Constant.QOS_QUEUE).build();
}
// 3. 队列和交换机绑定
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
}
}
@RequestMapping("/qos")
public String qos() {
// 发送消息
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE_NAME,"qos", "qos test..." + i);
}
return "发送成功";
}
@Component
public class QosQueueListener {
// 指定监听队列的名称
@RabbitListener(queues = Constant.QOS_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag);
// 3. 手动签收
//channel.basicAck(deliveryTag, true);
}
}
调用接口,发送消息:
我们观察管理平台
ready
,也就是待发送 15
条,未确认的 5
条(因为代码未手动 ack
)把配置里面的 prefetch: 5
注掉,然后在观察运行结果。
20
条消息全部收到管理平台:
我们也可以用此配置,来实现“负载均衡”
如下图所示,在有两个消费者的情况下,一个消费者处理任务非常快,另一个非常慢,就会造成一个消费老会一直很忙,而另一个消费者很闲
RabbitMQ
只是在消息进入队列时分派消息,他不考虑消费者未确认消息的数量我们可以使用设置 prefetch=1
的方式,告诉 RabbitMQ
一次只给一个消费者一条信息,也就是说,在处理并确认前一条消息之前,不要向该消费者发送新消息。相反,它会将它分派给下一个不忙的消费者
prefetch
参数,设置应答方式为手动应答listener:
simple:
acknowledge-mode: manual # 消息接收确认
prefetch: 1
Thread.sleep(100)
来模拟消费满@Component
public class QosQueueListener {
// 指定监听队列的名称
@RabbitListener(queues = Constant.QOS_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag);
// 3. 手动签收
channel.basicAck(deliveryTag, true);
}
// 指定监听队列的名称
@RabbitListener(queues = Constant.QOS_QUEUE)
public void ListenerQueue2(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("消费者2接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag);
// 模拟处理流程慢
Thread.sleep(100);
// 手动签收
channel.basicAck(deliveryTag, true);
}
}
调用接口,发送消息