RabbitMQ 是一个开源的消息代理和队列服务器,用于通过轻量级和可靠的消息在服务器之间进行通信。它实现了高级消息队列协议(AMQP),并支持多种消息传递模型,如发布/订阅、请求/响应和点对点。
MySQL 是一个流行的关系型数据库管理系统(RDBMS),广泛应用于各种规模的应用程序中,用于存储、检索和管理数据。
在高并发场景下,RabbitMQ 和 MySQL 常常一起使用,以处理大量的并发请求。
以下是一个简单的 Python 示例,展示如何使用 RabbitMQ 和 MySQL:
生产者 (producer.py):
import pika
import mysql.connector
# 连接到 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='task_queue', durable=True)
# 连接到 MySQL
db = mysql.connector.connect(
host="localhost",
user="user",
password="password",
database="mydatabase"
)
cursor = db.cursor()
# 模拟发送消息
for i in range(10):
message = f"Task {i}"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(f" [x] Sent {message}")
# 关闭连接
cursor.close()
db.close()
connection.close()
消费者 (consumer.py):
import pika
import mysql.connector
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
# 连接到 MySQL
db = mysql.connector.connect(
host="localhost",
user="user",
password="password",
database="mydatabase"
)
cursor = db.cursor()
# 处理消息并更新数据库
cursor.execute("INSERT INTO tasks (task) VALUES (%s)", (body,))
db.commit()
# 关闭连接
cursor.close()
db.close()
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
# 连接到 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='task_queue', durable=True)
# 设置 QoS
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
参考链接:
领取专属 10元无门槛券
手把手带您无忧上云