上篇文章RabbitMQ 入门系列(一)讲述了 RabbitMQ 有关的基本概念。本文将会给出具体的示例继续讲解,这些示例均来源于官方文档,但其使用的是传统的回掉函数的写法,我将其改写成了async/await 的形式,同时对内容做了部分微调。
01
—
流程
我们先来了解一下 RabbitMQ 的一般使用流程。
1、建立到 RabbitMQ 的连接。
2、创建信道。
3、声明交换器。
4、声明队列。
5、绑定交换器和队列。
6、消息操作。生产者:生成并发布消息;消费者:订阅并消费消息。
7、关闭信道。
8、关闭连接。
不论是生产者投递消息,还是消费者接受消息一般都遵循以上步骤,但针对具体的情况仍会有调整,比如声明交换器、声明队列、绑定交换器和队列,我们只需要在生产者或消费者其中之一进行,甚至隔离出来独立维护,只要保证在发布或消费消息之前交换器、队列、绑定等是有效的即可。
02
—
Hello World
第一个示例,实现基本的投递和接收消息。
生产者投递消息(send.js):
消费者接收消息(receive.js):
对比上述流程,你会发现为什么没有交换器 Exchange 存在的身影呢?这是因为 RabbitMQ 存在一个默认交换器,类型为 direct (直连),每个新建的队列会自动绑定到默认交换器上,并且以队列的名称作为绑定路由规则。
声明队列时,同一个队列其属性前后相同时,重复声明不会有任何影响,反之其属性前后不相同时,重复声明会抛出一个错误,这种情况要注意不得重复声明,当然如果这个队列被声明有效了也不需要再次声明。
从上图中我们也了解到了队列的一个属性 durable,这个属性表明是否对队列进行持久化,也就是保存到磁盘上,一旦 RabbitMQ 服务器重启,持久化的队列可以被重新恢复。
消费者 consume 订阅接收消息时使用了另一个属性 noAck,这个属性表明消费者在接收到消息后是否需要向 RabbitMQ 服务器确认收到该消息。与之相对的是发后即忘模式,也就是RabbitMQ 服务器向消费者发送完消息后即认为成功,无需等待消费者确认接收应答,这种模式吞吐量更高,但可靠性显然不如确认应答模式,而确认应答模式,我们需要注意的是,RabbitMQ 服务器若没有接收到 ack 确认会一直将该消息保存,如果消费者挂了就会造成消息持续堆叠不断占用内存的情况,极端情况下资源过载会造成RabbitMQ 服务器重启,同时未被 ack 确认的消息会被尝试重新发送给消费者。
03
—
Work queues
第二个示例,向多个消费者分发投递消息。
生产者投递消息(new_task.js):
消费者接收消息(worker.js):
我们在 shell 中运行多个 worker.js 会发现消息被一个一个分发到了不同的 worker 消费者,且同一条消息不会被重复发送给多个 worker 。
在这个示例中,我们对队列进行了持久化,并且在消费端使用了 ack 确认接收消息。发送消息时,我们使用了 persistent 属性,这个属性表明是否将消息持久化。另外,对消费者而言,还使用了 ch.prefetch() 方法,这个方法表明该消费者每次最多接收的消息数量,这样做是因为某些情况下消费消息是一个很耗时的业务操作,某些 worker 可能处于繁忙状态,而另外一些 worker 则很空闲,通过 prefetch 和 ack 其实是实现了类似于负载均衡的功能,也就是将消息分发给空闲的 worker 消费。
04
—
结语
本文给出的两个示例,实现了基本的消息投递与接收功能,并对某些属性方法进行了简单的描述,读着可以将文中的示例与官方教程对比查看并加深理解。
下一篇文章将会通过三个具体的示例重点介绍交换器的不同类型。
领取专属 10元无门槛券
私享最新 技术干货