对RabbitMQ来说是新手。在浏览多个站点之后,我可以构造以下程序来并行发送多条消息。
sender.py
import pika
from threading import Thread
from queue import Queue
import multiprocessing
class MetaClass(type):
_instance = {}
def __call__(cls, *args, **kwargs):
"""
Singleton Design pattern
if the instance already exist don't create one!
"""
if cls not in cls._instance:
cls._instance[cls] = super(MetaClass, cls).__call__(*args, **kwargs)
return cls._instance[cls]
class RabbitMQConfigure(metaclass=MetaClass):
def __init__(self, queue='durable_task_queue', host="localhost", routing_key="durable_task_queue", exchange=""):
"""
Configure RabbitMQ server
"""
self.queue = queue
self.host = host
self.routing_key = routing_key
self.exchange = exchange
class RabbitMQ(Thread):
def __init__(self, rabbit_mq_server, queue1):
Thread.__init__(self)
self.rabbit_mq_server = rabbit_mq_server
self.queue1 = queue1
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(self.rabbit_mq_server.host))
self._channel = self._connection.channel()
self._channel.queue_declare(queue=self.rabbit_mq_server.queue, durable=True)
def __enter__(self):
print("__enter__")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print("__exit__")
self._connection.close()
def publish(self, message=""):
print("Inside publish method...")
self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange,
routing_key=self.rabbit_mq_server.routing_key, body=message)
print(" [x] Sent %r" % message)
def run(self):
i = 0
while i <= 5:
# Get the work from the queue and expand the tuple
message = self.queue1.get()
print("Inside run method...")
print("Going to call publish method...")
print("Message value:" + message)
self.publish(message=message)
i += 1
if __name__ == "__main__":
rabbit_mq_server = RabbitMQConfigure(queue='durable_task_queue', host="localhost", routing_key='durable_task_queue',
exchange="")
# with RabbitMQ(rabbit_mq_server, message="Hello World!") as rabbitmq:
# rabbitmq.publish()
queue1 = Queue()
no_of_CPUs = multiprocessing.cpu_count()
messages = []
for i in range(5):
messages.append("Hello world1" + str(i))
for x in range(2):
with RabbitMQ(rabbit_mq_server, queue1) as rabbitmq:
# rabbitmq.daemon = True
rabbitmq.start()
# Put the tasks into the queue as a tuple
for message in messages:
queue1.put(message)
# Causes the main thread to wait for the queue to finish processing all the tasks
queue1.join()但是这个程序总是产生以下输出而不发送任何消息:
E:\rabbitmq\venv\Scripts\python.exe E:/rabbitmq/work_queues/new_task.py
__enter__
__exit__
__enter__
__exit__ Inside run method...Inside run method... Going to call publish method... Going to call publish method...
Message value:Hello world11Message value:Hello world10 Inside publish method...
Inside publish method... Exception in thread Exception in thread Thread-3: Traceback (most recent call last): File "C:\Python38\lib\threading.py", line 932, in _bootstrap_inner Thread-1: Traceback (most recent call last): File "C:\Python38\lib\threading.py", line 932, in _bootstrap_inner
self.run() File "E:/rabbitmq/work_queues/new_task.py", line 79, in run self.run() File "E:/rabbitmq/work_queues/new_task.py", line 79, in run
self.publish(message=message) File "E:/rabbitmq/work_queues/new_task.py", line 67, in publish
self.publish(message=message) File "E:/rabbitmq/work_queues/new_task.py", line 67, in publish
self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange, File "E:\rabbitmq\venv\lib\site-packages\pika\adapters\blocking_connection.py", line 2242, in basic_publish
self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange, File "E:\rabbitmq\venv\lib\site-packages\pika\adapters\blocking_connection.py", line 2242, in basic_publish
self._impl.basic_publish( self._impl.basic_publish( File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 421, in basic_publish
File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 421, in basic_publish
self._raise_if_not_open() File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 1389, in
_raise_if_not_open
self._raise_if_not_open() File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 1389, in
_raise_if_not_open
raise exceptions.ChannelWrongStateError('Channel is closed.') pika.exceptions. raise exceptions.ChannelWrongStateError('Channel is closed.') ChannelWrongStateError: Channel is closed.pika.exceptions.ChannelWrongStateError: Channel is closed. 是否可以并行发送多条消息?是否有可能一次性使用所有这些信息?
发布于 2020-08-01 16:53:57
队列是相互独立的,对于每个队列中的单个消息来说是一样的。不过,您可以控制订阅哪个队列的消费者,但仅此而已。
如果消息是并行发送和使用的,为什么不创建一个包含所有有效负载的大消息呢?
https://stackoverflow.com/questions/63154385
复制相似问题