首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何并行发送多条消息,并使用RabbitMQ一次性使用它们?

如何并行发送多条消息,并使用RabbitMQ一次性使用它们?
EN

Stack Overflow用户
提问于 2020-07-29 13:10:22
回答 1查看 454关注 0票数 0

对RabbitMQ来说是新手。在浏览多个站点之后,我可以构造以下程序来并行发送多条消息。

sender.py

代码语言:javascript
复制
import pika
from threading import Thread
from queue import Queue
import multiprocessing


class MetaClass(type):
    _instance = {}

    def __call__(cls, *args, **kwargs):
        """
        Singleton Design pattern
        if the instance already exist don't create one!
        """
        if cls not in cls._instance:
            cls._instance[cls] = super(MetaClass, cls).__call__(*args, **kwargs)
            return cls._instance[cls]


class RabbitMQConfigure(metaclass=MetaClass):
    def __init__(self, queue='durable_task_queue', host="localhost", routing_key="durable_task_queue", exchange=""):
        """
        Configure RabbitMQ server
        """
        self.queue = queue
        self.host = host
        self.routing_key = routing_key
        self.exchange = exchange


class RabbitMQ(Thread):
    def __init__(self, rabbit_mq_server, queue1):
        Thread.__init__(self)
        self.rabbit_mq_server = rabbit_mq_server
        self.queue1 = queue1

        self._connection = pika.BlockingConnection(
            pika.ConnectionParameters(self.rabbit_mq_server.host))
        self._channel = self._connection.channel()
        self._channel.queue_declare(queue=self.rabbit_mq_server.queue, durable=True)

    def __enter__(self):
        print("__enter__")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print("__exit__")
        self._connection.close()

    def publish(self, message=""):
        print("Inside publish method...")
        self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange,
                                    routing_key=self.rabbit_mq_server.routing_key, body=message)
        print(" [x] Sent %r" % message)

    def run(self):
        i = 0
        while i <= 5:
            # Get the work from the queue and expand the tuple
            message = self.queue1.get()
            print("Inside run method...")
            print("Going to call publish method...")
            print("Message value:" + message)
            self.publish(message=message)
            i += 1


if __name__ == "__main__":
    rabbit_mq_server = RabbitMQConfigure(queue='durable_task_queue', host="localhost", routing_key='durable_task_queue',
                                         exchange="")
    # with RabbitMQ(rabbit_mq_server, message="Hello World!") as rabbitmq:
    #     rabbitmq.publish()

    queue1 = Queue()
    no_of_CPUs = multiprocessing.cpu_count()

    messages = []
    for i in range(5):
        messages.append("Hello world1" + str(i))

    for x in range(2):
        with RabbitMQ(rabbit_mq_server, queue1) as rabbitmq:
            # rabbitmq.daemon = True
            rabbitmq.start()

    # Put the tasks into the queue as a tuple
    for message in messages:
        queue1.put(message)
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue1.join()

但是这个程序总是产生以下输出而不发送任何消息:

代码语言:javascript
复制
E:\rabbitmq\venv\Scripts\python.exe E:/rabbitmq/work_queues/new_task.py
__enter__
__exit__
__enter__
__exit__ Inside run method...Inside run method... Going to call publish method... Going to call publish method...

Message value:Hello world11Message value:Hello world10 Inside publish method...

Inside publish method... Exception in thread Exception in thread Thread-3: Traceback (most recent call last):   File "C:\Python38\lib\threading.py", line 932, in _bootstrap_inner Thread-1: Traceback (most recent call last):   File "C:\Python38\lib\threading.py", line 932, in _bootstrap_inner
        self.run()   File "E:/rabbitmq/work_queues/new_task.py", line 79, in run self.run()   File "E:/rabbitmq/work_queues/new_task.py", line 79, in run
    self.publish(message=message)   File "E:/rabbitmq/work_queues/new_task.py", line 67, in publish
    self.publish(message=message)   File "E:/rabbitmq/work_queues/new_task.py", line 67, in publish
    self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange,   File "E:\rabbitmq\venv\lib\site-packages\pika\adapters\blocking_connection.py", line 2242, in basic_publish
    self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange,   File "E:\rabbitmq\venv\lib\site-packages\pika\adapters\blocking_connection.py", line 2242, in basic_publish
        self._impl.basic_publish( self._impl.basic_publish(  File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 421, in basic_publish

  File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 421, in basic_publish
    self._raise_if_not_open()   File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 1389, in
_raise_if_not_open
    self._raise_if_not_open()   File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 1389, in
_raise_if_not_open
    raise exceptions.ChannelWrongStateError('Channel is closed.') pika.exceptions.    raise exceptions.ChannelWrongStateError('Channel is closed.') ChannelWrongStateError: Channel is closed.pika.exceptions.ChannelWrongStateError:  Channel is closed. 

是否可以并行发送多条消息?是否有可能一次性使用所有这些信息?

EN

回答 1

Stack Overflow用户

发布于 2020-08-01 16:53:57

队列是相互独立的,对于每个队列中的单个消息来说是一样的。不过,您可以控制订阅哪个队列的消费者,但仅此而已。

如果消息是并行发送和使用的,为什么不创建一个包含所有有效负载的大消息呢?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63154385

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档