Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RabbitMQ-生产者(Producer)

RabbitMQ-生产者(Producer)

作者头像
运维小路
发布于 2025-06-21 03:25:12
发布于 2025-06-21 03:25:12
4500
代码可运行
举报
文章被收录于专栏:运维小路运维小路
运行总次数:0
代码可运行

中间件,我给它的定义就是为了实现某系业务功能依赖的软件,包括如下部分:

Web服务器

代理服务器

ZooKeeper

Kafka

RabbitMQ(本章节)

我们通过虚拟主机,交换机,队列,绑定,将RabbitMQ连成了一个整体,生产者可以向交换机发送消息,交换机根据绑定规则可以把消息转发给对应的队列进行存储,消费者可以连接到队列去消费数据。

今天我们就通过代码向RabbitMQ发送消息,重点需要关注的就是生产者需要知道哪些信息。以下代码基于DeepSeek生成。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import pika
import json
import time
import random
from datetime import datetime
import uuid
import logging
import signal
import sys

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("exchange_producer")

# RabbitMQ 配置
RABBITMQ_HOST = '192.168.31.151'
RABBITMQ_PORT = 5672
RABBITMQ_USER = 'admin'
RABBITMQ_PASS = '123456'
RABBITMQ_VHOST = '/test'
EXCHANGE_NAME = 'test01'
ROUTING_KEY = 'luyouke'  # 路由键

# 全局变量,用于控制程序运行
running = True

def signal_handler(sig, frame):
    """处理Ctrl+C信号,优雅地停止程序"""
    global running
    logger.info("Received shutdown signal. Stopping producer...")
    running = False

def generate_sensor_data():
    """生成随机的传感器数据"""
    return {
        "sensor_id": f"SENSOR-{random.randint(1000, 9999)}",
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "reading_id": str(uuid.uuid4()),
        "temperature": round(random.uniform(18.0, 32.0), 2),
        "humidity": round(random.uniform(30.0, 90.0), 2),
        "status": random.choice(["NORMAL", "WARNING", "CRITICAL"]),
        "battery": round(random.uniform(20.0, 100.0), 2)
    }

def setup_rabbitmq_connection():
    """建立RabbitMQ连接"""
    try:
        credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
        parameters = pika.ConnectionParameters(
            host=RABBITMQ_HOST,
            port=RABBITMQ_PORT,
            virtual_host=RABBITMQ_VHOST,
            credentials=credentials,
            heartbeat=600,
            blocked_connection_timeout=300
        )

        connection = pika.BlockingConnection(parameters)
        channel = connection.channel()

        # 仅声明交换机(确保交换机存在)
        channel.exchange_declare(
            exchange=EXCHANGE_NAME,
            exchange_type='direct',
            durable=True
        )

        logger.info(f"Connected to RabbitMQ. Using exchange: {EXCHANGE_NAME}")
        return connection, channel

    except Exception as e:
        logger.error(f"Failed to connect to RabbitMQ: {str(e)}")
        return None, None

def main():
    """主函数,持续向交换机发送消息"""
    global running

    # 注册信号处理器
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    connection, channel = setup_rabbitmq_connection()
    if not channel:
        logger.error("Exiting due to connection failure")
        return

    logger.info("Starting exchange message producer")
    logger.info(f"Sending messages to exchange: {EXCHANGE_NAME}")
    logger.info(f"Using routing key: {ROUTING_KEY}")
    logger.info("Press Ctrl+C to stop")

    message_count = 0

    try:
        while running:
            try:
                # 如果连接关闭,尝试重新连接
                if connection.is_closed or channel.is_closed:
                    logger.warning("Connection lost. Reconnecting...")
                    connection, channel = setup_rabbitmq_connection()
                    if not channel:
                        time.sleep(5)
                        continue

                # 生成随机数据
                sensor_data = generate_sensor_data()

                # 发布消息到交换机
                channel.basic_publish(
                    exchange=EXCHANGE_NAME,
                    routing_key=ROUTING_KEY,
                    body=json.dumps(sensor_data),
                    properties=pika.BasicProperties(
                        delivery_mode=2,  # 持久化消息
                        content_type='application/json',
                        message_id=str(uuid.uuid4()),
                        timestamp=int(time.time())
                    )
                )

                message_count += 1

                # 每10条消息打印一次摘要
                if message_count % 10 == 0:
                    logger.info(f"Sent {message_count} messages | "
                               f"Exchange: {EXCHANGE_NAME} | "
                               f"Routing Key: {ROUTING_KEY}")

                # 等待1秒
                time.sleep(1)

            except pika.exceptions.ConnectionClosedByBroker:
                logger.warning("Connection closed by broker. Reconnecting...")
                connection, channel = setup_rabbitmq_connection()
                time.sleep(2)

            except pika.exceptions.AMQPChannelError as e:
                logger.error(f"Channel error: {str(e)}. Recreating channel...")
                if connection and connection.is_open:
                    try:
                        channel = connection.channel()
                    except Exception as e:
                        logger.error(f"Failed to recreate channel: {str(e)}")
                        connection, channel = setup_rabbitmq_connection()
                time.sleep(1)

            except Exception as e:
                logger.error(f"Unexpected error: {str(e)}")
                time.sleep(1)

    except Exception as e:
        logger.error(f"Critical error: {str(e)}")

    finally:
        # 清理资源
        if connection and connection.is_open:
            try:
                connection.close()
                logger.info("RabbitMQ connection closed")
            except Exception:
                pass

        logger.info(f"Producer stopped. Total messages sent to exchange: {message_count}")

if __name__ == "__main__":
    main()
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
025-06-18 23:27:37,125 - INFO - Connected to RabbitMQ. Using exchange: ceph115
2025-06-18 23:27:37,125 - INFO - Starting exchange message producer
2025-06-18 23:27:37,125 - INFO - Sending messages to exchange: ceph115
2025-06-18 23:27:37,125 - INFO - Using routing key: luyoukey
2025-06-18 23:27:37,125 - INFO - Press Ctrl+C to stop
2025-06-18 23:27:46,139 - INFO - Sent 10 messages | Exchange: ceph115 | Routing Key: luyoukey
2025-06-18 23:27:56,154 - INFO - Sent 20 messages | Exchange: ceph115 | Routing Key: luyoukey
2025-06-18 23:28:06,170 - INFO - Sent 30 messages | Exchange: ceph115 | Routing Key: luyoukey

由于我这个交换机绑定了2个队列,使用相同的路由键所以两个队列都会收到相同的消息。总计658条消息。

每个队列都有289条消息。

以上是我按正常的条件(账号,密码,虚拟主机,交换机,路由键)向RabbitMQ发送消息,如果缺失(虚拟主机,交换机,路由键)会发送什么情况呢,大家可以下来去测试一下。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-06-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 运维小路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
RabbitMQ-消费者(Consumer)
上个小节我们通过生产者代码,向RabbitMQ的交换机发送消息,本小节我们就通过消费者代码去读取队列里面数据,以下代码基于DeepSeek生成。
运维小路
2025/06/23
1090
RabbitMQ-消费者(Consumer)
pika.exceptions.ConnectionClosed 问题
最近一个处理程序又遇到 pika.exceptions.ConnectionClosed 这个问题,
周小董
2019/03/25
3K0
pika.exceptions.ConnectionClosed 问题
RabbitMQ 模型和死信队列
RabbitMQ 是一个生产者/消费者模型,生产者生产消息到队列中,而消费者从队列中拿消息进行消费,两者并不直接交互。
CS实验室
2021/03/22
6940
RabbitMQ 模型和死信队列
部署Rabbitmq
RabbitMQ是一个开源的靠AMQP协议实现的服务,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 它可以使对应的客户端(client)与对应的消息中间件(broker)进行交互。消息中间件发布者(publisher)那里收到消息(发布消息的应用,也称为producer),然后将他们转发给消费者(consumers,处理消息的应用)。由于AMQP是一个网络协议,所以发布者、消费者以及消息中间件可以部署到不同的物理机器上。
小手冰凉
2020/04/02
6730
【RabbitMQ】重识
RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
后端码匠
2023/11/12
3660
RabbitMQ(从安装到使用)
  RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
Wyc
2018/09/11
5550
RabbitMQ(从安装到使用)
RabbitMQ中的Exchange是什么?它有哪些类型?
在RabbitMQ中,Exchange(交换机)是消息的中转站,用于接收生产者发送的消息,并将其路由到一个或多个队列。Exchange根据特定的路由规则将消息发送到队列中,以便消费者可以从队列中接收消息。
GeekLiHua
2025/01/21
3280
python中RabbitMQ的使用(安装和简单教程)
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message
周小董
2019/03/25
4K0
python中RabbitMQ的使用(安装和简单教程)
研究一下RabbitMQ
http://192.168.1.6:15672 默认账号:guest / guest
全栈程序员站长
2022/06/30
5340
研究一下RabbitMQ
Python之Rabbitmq的fanout模式
这种模式下,传递到 Exchange 的消息将会转发到所有与其绑定的 Queue 上。
Wu_Candy
2022/07/04
4590
Python之Rabbitmq的fanout模式
Docker中部署RabbitMQ并使用Python3.x操作全书(Python操作RabbitMQ看这一篇就够了)
使用Python操作RabbitMQ的书籍以及例子,少之又少。翻遍了网上所有的例子,发现十个有9个半不能运行的,这半个你还得修改。 原因很简单,要么例子的Python版本太低了,要么例子的RabbitMQ的版本太低了。所以造成了一系列文字。 让我很痛苦,决定下笔写一篇关于这个的文章。
手撕代码八百里
2020/07/28
1.8K0
Docker中部署RabbitMQ并使用Python3.x操作全书(Python操作RabbitMQ看这一篇就够了)
python操作rabbitmq 实践笔
2.  实现功能: (1)rabbitmq循环调度,将消息循环发送给不同的消费者,如:消息1,3,5发送给消费者1;消息2,4,6发送给消费者2。                    (2)消息确认机制,为了确保一个消息不会丢失,RabbitMQ支持消息的确认 , 一个 ack(acknowlegement) 是从消费者端发送一个确认去告诉RabbitMQ 消息已经接收了、处理了,RabbitMQ可以释放并删除掉了。如果一个消费者死掉了(channel关闭、connection关闭、或者TCP连接断开了)而没有发送ack,RabbitMQ 就会认为这个消息没有被消费者处理,并会重新发送到生产者的队列里,如果同时有另外一个消费者在线,rabbitmq将会将消息很快转发到另外一个消费者中。 那样的话你就能确保虽然一个消费者死掉,但消息不会丢失。         这个是没有超时的,当消费方(consumer)死掉后RabbitMQ会重新转发消息,即使处理这个消息需要很长很长时间也没有问题。消息的 acknowlegments 默认是打开的,在前面的例子中关闭了: no_ack = True . 现在删除这个标识 然后 发送一个 acknowledgment。                    (3)消息持久化,将消息写入硬盘中。  RabbitMQ不允许你重新定义一个已经存在、但属性不同的queue。需要标记消息为持久化的 - 要通过设置 delivery_mode 属性为 2来实现。         消息持久化的注意点:         标记消息为持久化并不能完全保证消息不会丢失,尽管已经告诉RabbitMQ将消息保存到磁盘,但RabbitMQ接收到的消息在还没有保存的时候,仍然有一个短暂的时间窗口。RabbitMQ不会对每个消息都执行同步 --- 可能只是保存到缓存cache还没有写入到磁盘中。因此这个持久化保证并不是很强,但这比我们简单的任务queue要好很多,如果想要很强的持久化保证,可以使用 publisher confirms。                    (4)公平调度。在一个消费者未处理完一个消息之前不要分发新的消息给它,而是将这个新消息分发给另一个不是很忙的消费者进行处理。为了解决这个问题我们可以在消费者代码中使用 channel.basic.qos ( prefetch_count = 1 ),将消费者设置为公平调度。 生产者
py3study
2020/01/09
2.1K0
RabbitMQ入门
笔者经常能看到MQ这个词,知道其作为消息队列,但始终没有接触过,现在刚好有个机会(不知道在抢答系统中能不能用上),首先当然要知道MQ有什么作用:
晚上没宵夜
2020/04/30
5450
RabbitMQ入门
RabbitMQ实战5.路由
注意!交换机为匿名交换机时,routing_key指的是队列名,这只是一个特例!这个参数的本意是路由键名!
章鱼喵
2018/08/22
2880
RabbitMQ实战5.路由
RabbitMQ Topic交换机
Topic交换机是RabbitMQ中最灵活和强大的一种交换机类型。它根据消息的路由键(Routing Key)和绑定键(Binding Key)之间的模式匹配,将消息发送到与之匹配的队列。通过使用通配符模式,Topic交换机可以实现精确匹配或模糊匹配的消息路由。
堕落飞鸟
2023/05/16
4050
RabbitMQ direct交换机
Direct交换机是RabbitMQ中一种常用的交换机类型。它根据消息的路由键(Routing Key)将消息发送到与之匹配的队列。每个Direct交换机都会绑定一个或多个队列,并根据消息的路由键选择性地将消息发送给匹配的队列。
堕落飞鸟
2023/05/16
3850
RabbitMQ交换机
RabbitMQ是一个功能强大的消息中间件,其中交换机(Exchange)是消息路由的核心组件之一。交换机负责接收生产者发送的消息,并将消息路由到一个或多个绑定的队列中。
堕落飞鸟
2023/05/16
7060
RabbitMQ与AMQP协议
AMQP(Advanced Message Queuing Protocol, 高级消息队列协议)是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。 RabbitMQ是一个实现了AMQP协议标准的开源消息代理和队列服务器。 1、基本概念 在服务器中,三个主要功能模块连接成一个处理链完成预期的功能: 1)“exchange”接收发布应用程序发送的消息,并根
用户1225216
2018/03/05
1.7K0
RabbitMQ与AMQP协议
RabbitMQ-案例(虚拟机创建流程)
作者介绍:简历上没有一个精通的运维工程师,下面的思维导图也是预计更新的内容和当前进度(不定时更新)
运维小路
2025/07/08
750
RabbitMQ-案例(虚拟机创建流程)
消息中间件之Rabbitmq
1、https://www.kancloud.cn/yunxifd/rabbitmq/96997
爱撒谎的男孩
2019/12/31
9900
相关推荐
RabbitMQ-消费者(Consumer)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档