操作场景
本文以调用 Python SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
下载 Demo
操作步骤
步骤1:添加依赖
1. 根据 RabbitMQ 官网推荐使用 pika,首先要在客户端使用环境中安装 pika。
python -m pip install pika --upgrade
2. 在创建客户端时导入 pika。
import pika
步骤2:生产消息
创建并编译运行生产消息程序 messageProducer.py。
import pika# 使用用户名和密码创建登录凭证对象credentials = pika.PlainCredentials('rolename', 'eyJr***')# 创建连接connection = pika.BlockingConnection(pika.ConnectionParameters(host='111.222.333.44', port=5672, virtual_host='Vhostname', credentials=credentials))# 建立信道channel = connection.channel()# 声明交换机channel.exchange_declare(exchange='ExchangeName', exchange_type="ExchangeType")routingKeys = ['aaa.bbb.ccc', 'aaa.bbb.ddd', 'aaa.ccc.zzz', "xxx.yyy.zzz"]for routingKey in routingKeys:# 发送消息到指定的交换机# 不指定交换机的情况下发送消息,需要指定消息队列,参数routing_key在使用指定交换机时,表示routing_key,不指定交换机时代表消息队列名称channel.basic_publish(exchange='direct_exchange',routing_key=routingKey,body=(routingKey + 'This is a new direct message.').encode(),properties=pika.BasicProperties(delivery_mode=2, # 设置消息持久化))print('send success msg to rabbitmq')connection.close()
参数 | 说明 |
rolename | 用户名称,填写在控制台创建的用户名称。 |
eyJr*** | 用户密码,填写在控制台创建用户时填写的密码。 |
host | 集群接入地址,在集群基本信息页面的客户端接入模块获取。 |
port | 集群接入地址端口,在集群基本信息页面的客户端接入模块获取。 |
virtual_host | Vhost 名称,在控制台 Vhost 列表获取。 |
direct_exchange | Exchange 名称,在控制台 Exchange 列表获取。 |
routingKeys | 消息的路由规则,在控制台 绑定关系列表的绑定 Key列获取。 |
步骤3:消费消息
创建并编译运行消费消息程序 messageConsumer.py。
import osimport pikaimport sysdef main():# 使用用户名和密码创建登录凭证对象credentials = pika.PlainCredentials('rolename', 'eyJr***')# 创建连接connection = pika.BlockingConnection(pika.ConnectionParameters(host='111.222.333.44', port=5672, virtual_host='Vhostname', credentials=credentials))# 建立信道channel = connection.channel()# 声明消息队列channel.queue_declare(queue='route_queue1', exclusive=True, durable=True)# 绑定消息队列到交换机,并指定 routing keyrouting_keys = ['aaa.bbb.ccc', 'aaa.bbb.ddd']for routingKey in routing_keys:channel.queue_bind(exchange='direct_exchange', queue="route_queue1", routing_key=routingKey)# 设置只接受一个未确认消息channel.basic_qos(prefetch_count=1)# 消息消费逻辑def callback(ch, method, properties, body):print(" [Consumer1(Direct 'aaa.bbb.ccc'/'aaa.bbb.ddd')] Received (%r)" % body)# 手动回复ACKch.basic_ack(delivery_tag=method.delivery_tag)# 创建消费者,消费消息队列中的消息channel.basic_consume(queue='route_queue1',on_message_callback=callback,auto_ack=False) # 设置为非自动确认print(" [Consumer1(Direct 'aaa.bbb.ccc'/'aaa.bbb.ddd')] Waiting for messages. To exit press CTRL+C")channel.start_consuming()if __name__ == '__main__':try:main()except KeyboardInterrupt:print('Interrupted')try:sys.exit(0)except SystemExit:os._exit(0)
参数 | 说明 |
rolename | 用户名称,填写在控制台创建的用户名称。 |
eyJr*** | 用户密码,填写在控制台创建用户时填写的密码。 |
host | 集群接入地址,在集群基本信息页面的获取接入地址获取。 |
port | 集群接入地址端口,在集群基本信息页面的获取接入地址获取。 |
virtual_host | Vhost 名称,在控制台 Vhost 列表获取。 |
direct_exchange | Exchange 名称,在控制台 Exchange 列表获取。 |
route_queue1 | Queue名称,在控制台 Queue 列表获取。 |
routingKey | 消息的路由规则,在控制台 绑定关系列表的绑定 Key列获取。 |
步骤4:查看消息
说明