想玩转RabbitMQ,pika这个Python客户端库是绕不开的。它是个纯Python写的AMQP协议实现,跟RabbitMQ配合特别好。那些大厂的消息队列系统,底层很多都在用它。今天带你实操一把,保证写完就能上手用。
安装配置
装个pika超简单,pip一把梭就完事:
1pip install pika
连接RabbitMQ也就几行代码的事:
1import pika
3# 建立连接
4connection = pika.BlockingConnection(
5 pika.ConnectionParameters(host='localhost')
6)
7channel = connection.channel()
9# 声明队列
10channel.queue_declare(queue='hello')
小贴士:
记得先把RabbitMQ服务启动了再连
本地测试用localhost,生产环境记得改成真实IP
默认端口是5672,要改端口得在ConnectionParameters里指定
发送消息
整个发消息的逻辑贼简单,写个生产者代码:
1# producer.py
2import pika
4connection = pika.BlockingConnection(
5 pika.ConnectionParameters('localhost')
6)
7channel = connection.channel()
9# 队列不存在才建,已存在就跳过
10channel.queue_declare(queue='hello', durable=True)
12# 发送消息
13message = “老铁们,这是一条测试消息”
14channel.basic_publish(
15 exchange='', # 默认交换机
16 routing_key='hello', # 队列名
17 body=message,
18 properties=pika.BasicProperties(
19 delivery_mode=2 # 消息持久化
20 )
21)
23print(f“发送消息: {message}”)
24connection.close()
接收消息
消费者这边也不难,整个回调函数处理消息就成:
1# consumer.py
2import pika
4def callback(ch, method, properties, body):
5 print(f“收到消息: {body.decode()}”)
6 ch.basic_ack(delivery_tag=method.delivery_tag)
8connection = pika.BlockingConnection(
9 pika.ConnectionParameters('localhost')
10)
11channel = connection.channel()
13channel.queue_declare(queue='hello', durable=True)
15# 每次只处理一条消息
16channel.basic_qos(prefetch_count=1)
18channel.basic_consume(
19 queue='hello',
20 on_message_callback=callback
21)
23print(“等待消息中...”)
24channel.start_consuming()
小贴士:
basic_ack确认消息已处理,不然消息会一直重发
prefetch_count限制未确认的消息数,防止消费者压力太大
加上durable=True让队列持久化,重启不丢数据
进阶玩法
来点高端操作,用交换机实现发布订阅:
1# 声明交换机
2channel.exchange_declare(
3 exchange='logs',
4 exchange_type='fanout' # 广播模式
5)
7# 发送消息到交换机
8channel.basic_publish(
9 exchange='logs',
10 routing_key='',
11 body=message
12)
消费者这边要绑定临时队列:
1# 创建临时队列
2result = channel.queue_declare(queue='', exclusive=True)
5# 绑定到交换机
6channel.queue_bind(
7 exchange='logs',
8 queue=queue_name
9)
小贴士:
fanout是广播模式,所有绑定的队列都能收到消息
direct可以根据routing_key路由消息
topic支持通配符匹配routing_key
异常处理
生产环境必须要处理连接断开的情况:
1try:
2 channel.start_consuming()
3except pika.exceptions.ConnectionClosedByBroker:
4 print(“连接被broker关闭”)
5except pika.exceptions.AMQPChannelError:
6 print(“channel异常”)
7except KeyboardInterrupt:
8 print(“手动停止消费”)
9finally:
10 channel.close()
11 connection.close()
用pika写消息队列就讲到这,贴心提醒下坑点:
默认是不持久化的,重启数据就没了
消息没确认的话会一直重发,小心内存爆了
生产环境记得加连接重试和心跳检测
长连接最好加个定时重连机制
代码写得不错,但也别忘了看看监控面板,关注下队列堆积情况。好了,整个pika实战就到这,上手写代码去吧!
领取专属 10元无门槛券
私享最新 技术干货