前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ Stream类型队列

RabbitMQ Stream类型队列

作者头像
雪飞鸿
发布2023-09-01 19:13:20
4210
发布2023-09-01 19:13:20
举报
文章被收录于专栏:me的随笔

RabbitMQ提供了三种类型的队列:

官方文档 对于流队列的描述是:高性能、可持久化、可复制、非破坏性消费、只追加写入的日志

使用场景:

  • 一个队列将同一条消息分发给不同消费者
  • 可重复消费消息
  • 更高的性能
    • 存储大量消息而不影响性能
    • 更高的吞吐

基本使用

生产消息:

代码语言:javascript
复制
import pika
from pika import BasicProperties
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic
​
​
STREAM_QUEUE = "stream_queue"
​
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/"))
channel = connection.channel()
channel.queue_declare(queue=STREAM_QUEUE, durable=True, arguments={"x-queue-type": "stream"})
​
for i in range(500, 600):
    msg = f"{i}".encode()
    channel.basic_publish("", STREAM_QUEUE, msg)
​
channel.close()
connection.close()

消费消息:

代码语言:javascript
复制
import pika
from pika import BasicProperties
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic
​
​
def msg_handler(channel: BlockingChannel, method: Basic.Deliver, properties: BasicProperties, body: bytes):
    msg = f"获取消息:{body.decode()}"
    print(msg)
    channel.basic_ack(method.delivery_tag)
​
​
STREAM_QUEUE = "stream_queue"
​
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/"))
channel = connection.channel()
channel.queue_declare(queue=STREAM_QUEUE, durable=True, arguments={"x-queue-type": "stream"})
​
channel.basic_qos(prefetch_count=50)
channel.basic_consume(STREAM_QUEUE, on_message_callback=msg_handler, arguments={"x-stream-offset": 290})
channel.start_consuming()
​
channel.close()
connection.close()

Offset参数

可以通过x-stream-offset来控制读取消息的位置,对于改参数值的释义见下图,详情可参考:Offset Tracking with RabbitMQ Streams

chunk

上图中有个chunk的概念,chunk就是stream队列中用于存储和传输消息的单元,一个chunk包含几条到几千条不等的消息。


Stream 插件

以上只是对Stream类型队列的简单使用,API和普通队列没有差异。若要体验完整的Stream队列特性,如:服务端消息偏移量追踪,需要启用stream插件

不启用和启用流插件功能特性对比,可参考: Stream Core vs Stream Plugin

服务端消息偏移量追踪

Stream提供了服务端消息偏移量追踪,客户端断开重连后可以从上次消费的下一个位置开始消费消息。

⚠️ 有些客户端不支持dedicated binary 协议,无法提供完整的流队列特性支持

使用docker启动一个rabbitmq服务并启用stream插件:

代码语言:javascript
复制
docker run \
 -d --name rabbitmq \
 --hostname=node1 \
 --env=RABBITMQ_NODENAME=r1 \
 --env=RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
 --volume=rabbit_erl:/var/lib/rabbitmq \
 -p 15672:15672 -p 5672:5672 -p 5552:5552 \
 rabbitmq:3-management
 
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

这里使用rstream客户端来收发消息:

代码语言:javascript
复制
import asyncio
​
from rstream import (
    Producer
)
​
STREAM_QUEUE = "stream_queue"
CONSUMER_NAME = "py"
​
​
async def pub():
    async with Producer("localhost", 5552, username="guest", password="guest") as producer:
        await producer.create_stream(STREAM_QUEUE)
        for i in range(100, 300):
            await producer.send(STREAM_QUEUE, f"{i}".encode())
​
​
if __name__ == "__main__":
    asyncio.run(pub())

消费消息:

代码语言:javascript
复制
import asyncio
​
from rstream import (
    AMQPMessage,
    Consumer,
    ConsumerOffsetSpecification,
    MessageContext,
    OffsetType, OffsetNotFound
)
​
STREAM_QUEUE = "stream_queue"
CONSUMER_NAME = "py"
​
​
async def msg_handler(msg: AMQPMessage, context: MessageContext):
    print(msg)
    await context.consumer.store_offset(STREAM_QUEUE, CONSUMER_NAME, context.offset)
​
​
async def sub():
    consumer = Consumer("localhost", 5552, username="guest", password="guest")
    await consumer.start()
    try:
        offset = await consumer.query_offset(STREAM_QUEUE, CONSUMER_NAME)
    except OffsetNotFound:
        offset = 1
    await consumer.subscribe(STREAM_QUEUE, msg_handler,
                             offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset),
                             subscriber_name=CONSUMER_NAME)
    await consumer.run()
​
​
if __name__ == "__main__":
    asyncio.run(sub())

Kafka简单对比

rabbitmq

kafka

生产/消费者

queue

topic

底层消息存储

chunk

partition

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-08-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基本使用
    • Offset参数
      • chunk
      • Stream 插件
        • 服务端消息偏移量追踪
        • Kafka简单对比
        相关产品与服务
        容器服务
        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档