中间件,我给它的定义就是为了实现某系业务功能依赖的软件,包括如下部分:
Web服务器
代理服务器
ZooKeeper
Kafka
RabbitMQ(本章节)
我们通过虚拟主机,交换机,队列,绑定,将RabbitMQ连成了一个整体,生产者可以向交换机发送消息,交换机根据绑定规则可以把消息转发给对应的队列进行存储,消费者可以连接到队列去消费数据。
今天我们就通过代码向RabbitMQ发送消息,重点需要关注的就是生产者需要知道哪些信息。以下代码基于DeepSeek生成。
import pika
import json
import time
import random
from datetime import datetime
import uuid
import logging
import signal
import sys
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("exchange_producer")
# RabbitMQ 配置
RABBITMQ_HOST = '192.168.31.151'
RABBITMQ_PORT = 5672
RABBITMQ_USER = 'admin'
RABBITMQ_PASS = '123456'
RABBITMQ_VHOST = '/test'
EXCHANGE_NAME = 'test01'
ROUTING_KEY = 'luyouke' # 路由键
# 全局变量,用于控制程序运行
running = True
def signal_handler(sig, frame):
"""处理Ctrl+C信号,优雅地停止程序"""
global running
logger.info("Received shutdown signal. Stopping producer...")
running = False
def generate_sensor_data():
"""生成随机的传感器数据"""
return {
"sensor_id": f"SENSOR-{random.randint(1000, 9999)}",
"timestamp": datetime.utcnow().isoformat() + "Z",
"reading_id": str(uuid.uuid4()),
"temperature": round(random.uniform(18.0, 32.0), 2),
"humidity": round(random.uniform(30.0, 90.0), 2),
"status": random.choice(["NORMAL", "WARNING", "CRITICAL"]),
"battery": round(random.uniform(20.0, 100.0), 2)
}
def setup_rabbitmq_connection():
"""建立RabbitMQ连接"""
try:
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
virtual_host=RABBITMQ_VHOST,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 仅声明交换机(确保交换机存在)
channel.exchange_declare(
exchange=EXCHANGE_NAME,
exchange_type='direct',
durable=True
)
logger.info(f"Connected to RabbitMQ. Using exchange: {EXCHANGE_NAME}")
return connection, channel
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {str(e)}")
return None, None
def main():
"""主函数,持续向交换机发送消息"""
global running
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
connection, channel = setup_rabbitmq_connection()
if not channel:
logger.error("Exiting due to connection failure")
return
logger.info("Starting exchange message producer")
logger.info(f"Sending messages to exchange: {EXCHANGE_NAME}")
logger.info(f"Using routing key: {ROUTING_KEY}")
logger.info("Press Ctrl+C to stop")
message_count = 0
try:
while running:
try:
# 如果连接关闭,尝试重新连接
if connection.is_closed or channel.is_closed:
logger.warning("Connection lost. Reconnecting...")
connection, channel = setup_rabbitmq_connection()
if not channel:
time.sleep(5)
continue
# 生成随机数据
sensor_data = generate_sensor_data()
# 发布消息到交换机
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key=ROUTING_KEY,
body=json.dumps(sensor_data),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化消息
content_type='application/json',
message_id=str(uuid.uuid4()),
timestamp=int(time.time())
)
)
message_count += 1
# 每10条消息打印一次摘要
if message_count % 10 == 0:
logger.info(f"Sent {message_count} messages | "
f"Exchange: {EXCHANGE_NAME} | "
f"Routing Key: {ROUTING_KEY}")
# 等待1秒
time.sleep(1)
except pika.exceptions.ConnectionClosedByBroker:
logger.warning("Connection closed by broker. Reconnecting...")
connection, channel = setup_rabbitmq_connection()
time.sleep(2)
except pika.exceptions.AMQPChannelError as e:
logger.error(f"Channel error: {str(e)}. Recreating channel...")
if connection and connection.is_open:
try:
channel = connection.channel()
except Exception as e:
logger.error(f"Failed to recreate channel: {str(e)}")
connection, channel = setup_rabbitmq_connection()
time.sleep(1)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
time.sleep(1)
except Exception as e:
logger.error(f"Critical error: {str(e)}")
finally:
# 清理资源
if connection and connection.is_open:
try:
connection.close()
logger.info("RabbitMQ connection closed")
except Exception:
pass
logger.info(f"Producer stopped. Total messages sent to exchange: {message_count}")
if __name__ == "__main__":
main()
025-06-18 23:27:37,125 - INFO - Connected to RabbitMQ. Using exchange: ceph115
2025-06-18 23:27:37,125 - INFO - Starting exchange message producer
2025-06-18 23:27:37,125 - INFO - Sending messages to exchange: ceph115
2025-06-18 23:27:37,125 - INFO - Using routing key: luyoukey
2025-06-18 23:27:37,125 - INFO - Press Ctrl+C to stop
2025-06-18 23:27:46,139 - INFO - Sent 10 messages | Exchange: ceph115 | Routing Key: luyoukey
2025-06-18 23:27:56,154 - INFO - Sent 20 messages | Exchange: ceph115 | Routing Key: luyoukey
2025-06-18 23:28:06,170 - INFO - Sent 30 messages | Exchange: ceph115 | Routing Key: luyoukey
由于我这个交换机绑定了2个队列,使用相同的路由键所以两个队列都会收到相同的消息。总计658条消息。
每个队列都有289条消息。
以上是我按正常的条件(账号,密码,虚拟主机,交换机,路由键)向RabbitMQ发送消息,如果缺失(虚拟主机,交换机,路由键)会发送什么情况呢,大家可以下来去测试一下。