首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >消息队列 CMQ 七大功能实践案例

消息队列 CMQ 七大功能实践案例

原创
作者头像
serena
修改于 2021-08-03 06:56:12
修改于 2021-08-03 06:56:12
4.3K0
举报
文章被收录于专栏:社区的朋友们社区的朋友们

作者:庄秋涛

背景

消息队列,在业务解耦、削峰填谷、流量控制、广播消息等场景下都有很好的应用,已经成为很多企业IT系统内部通信重要手段。

现有常用的开源消息中间件有RabbitMQ、Kafka、RocketMQ等,但各自有着不同的应用场景和特点,例如,Kafka注重的是消息的吞吐量,不保证消息存储的可靠性以及一致性,因此多用于日志系统数据的上报;RabbitMQ能保证消息可靠存储投递,但性能较差。

CMQ(Cloud Message Queue)是腾讯云开发的一款高可靠、高可用、高性能的分布式消息队列服务,具有低耦合、消息可靠、强一致性、可扩展性等特点,支持Push/Pull消费模型、消息回溯、延时消息、发布订阅、路由广播、消息加密等一系列功能,以满足更多的mq应用场景。

相对Kafka,CMQ更多注重消息高可靠的应用场景,例如金融、交易、订单等业务;相比RabbitMQ,CMQ在可用性和性能上做了很大的优化和提升。更详细的对比,请参考官网介绍

本文先简单介绍CMQ底层的架构实现,然后着重结合CMQ的功能特点来介绍CMQ的实践案例,让大家快速理解和上手CMQ的开发。

底层架构

CMQ整体架构如上图所示,每个set由三个broker节点副本组成,保证消息的可靠存储以及高可用性,且基于raft算法保证数据的一致性。CMQ单个set 在CAP理论中优先保证了CP,当SET中过半数节点都正常工作时,才能进行消息的生产消费。

更详细介绍可参考这里

实践案例

一、广播拉取消息模型

CMQ支持队列(queue)和主题(topic)两种模型,如下所示:

其中,queue模型是一对一的消息拉取(pull)模式,client端主动pull消息;而topic模型,也称发布/订阅模型,是一对多的消息推送(push)模式,CMQ服务端广播消息时,根据各个订阅地址主动推送消息给client。两种模型基本能满足大部分应用场景了,对比如下:

  • queue模型,client端可以灵活根据自身能力去消费pull消息,消息实时性依赖client的消费速度,如果消费速度比生产速度慢,会引起大量消息堆积。
  • topic模型,服务端主动推送消息,消息实时性比较高,但要求client性能上能及时处理大量推送过来的消息,并且在client发生故障的时候可能会导致丢消息(有消息重发策略做基本保障)。

对于topic模型,有以下特殊场景需求:

  • client端想根据自身能力去pull消息
  • 创建订阅的时候需要暴露client端的接收消息的地址,但在一些企业内网、vpc网络等特殊情况下,CMQ无法推送到,只能用pull方式获取消息。

针对以上特殊场景,CMQ结合queue和topic两种模型实现了一对多的广播拉取消息模型,如下所示:

topic的订阅者可以是一个queue实例,topic发布消息后,会自动将消息推送到queue,然后client和使用queue模型一样去消费消息即可。

代码语言:txt
AI代码解释
复制
# python sdk demo code: create subscription of queue protocal
my_sub = my_account.get_subscription(topic_name, subscription_name)
subscription_meta = SubscriptionMeta()
subscription_meta.Endpoint = "queue1"
subscription_meta.Protocal = "queue"
my_sub.create(subscription_meta)

二、Pull长轮询

对于Queue模型,消费者需要pull获取消息,但问题是:消费者不知道队列什么时候有消息,只能不停轮询请求去pull,如果轮询间隔时间短,在队列长时间没有消息时会耗费消费者请求资源且效率低,如果轮询间隔时间长,则消费速度慢,消息实时性低,且造成消息大量堆积。

针对以上问题,CMQ解决方案是设计了长轮询功能。例如,假设设置队列长轮询时间为10s

  • 当消费者pull消息时,如果队列中有消息则马上返回
  • 如果队列暂时没有消息,消费者pull请求不会马上返回,而是会等待阻塞10s:当10s内有新的生产消息到达队列,CMQ会马上将消息投递给正在阻塞等待的消费者,消费者端感知就是阻塞的pull请求被唤醒并且收到消息返回;当10s内队列都没有消息,则请求返回告诉消费者当前队列没有消息。
代码语言:txt
AI代码解释
复制
# python sdk demo code: receive message through long polling
pollingWaitSeconds = 3
recv_msg = my_queue.receive_message(pollingWaitSeconds)

三、延时消息

CMQ提供延时消息功能:消息发送到队列后,从入队时间算起,消息在设置的延时时间后才对消费者可见,即才能被消费者消费到。延时消息功能可以很轻松实现一些定时任务的应用场景。

如上图所示,根据CMQ延迟消息功能实现的定时任务检查告警系统。

代码语言:txt
AI代码解释
复制
# python sdk demo code: send delayed message
msg_body = "I am delay message"
msg = Message(msg_body)
delaySeconds = 3
my_queue.send_message(msg, delaySeconds)

四、消息回溯

CMQ提供类似于Kafka的消息回溯能力,已经消费删除的消息是可以通过回溯来重新消费的。目前支持指定回溯时间点,在这个时间点开始被删除的消息可以重新消费到。此功能在一些金融业务对账、业务系统重试等场景下有很好的实用性。

最大可回溯时间点 = 当前时间 - 设置的可回溯时长。消息生产时间在这个值之前的不可回溯,之后的可回溯,如下图所示:

详细介绍参考官网说明

代码语言:txt
AI代码解释
复制
# python sdk demo code: rewind the queue
# backtrack one hour
backTrackingTime = int(time.time()) - 3600
my_queue.rewindQueue(backTrackingTime)

五、Topic路由匹配

CMQ topic模型提供类似于RabbitMQ的消息路由匹配功能,在消息广播基础上实现了消息的自动分发。

订阅者可以指定bindingKey,即路由规则,如上所示,*(星号)可以匹配一个单词,#(井号)可以匹配一个或多个单词。例如,生产者发布一个消息,且消息的路由键(routingKey)是”quick.orange.elephant”,那么该消息只会推送给消费者C1;如果routingKey=”quick.orange.rabbit”,则消息会推送给C1和C2;如果routingKey=”lazy.brown.fox”,则消息只会推送给C2。

详细介绍参考官网说明

代码语言:txt
AI代码解释
复制
# python sdk demo code:  set topic-subscription route-rule
my_sub = my_account.get_subscription(topic_name, subscription_name)
subscription_meta = SubscriptionMeta()
subscription_meta.Endpoint = "http://test.com"
subscription_meta.Protocal = "http"
subscription_meta.bindingKey = ['*.*.rabbit','lazy.#']
my_sub.create(subscription_meta)

message = Message()
message.msgBody = "route msg test"
my_topic.publish_message(message, 'quick.orange.rabbit')

六、超大消息传输

目前CMQ的队列消息大小最大限制为1MB,而当消息大小不超过64KB时,收发消息的最大QPS限制分别为正常的5k(有特殊需求可调整),当消息大小超过64KB而小于1MB时,CMQ不保证收发消息的QPS性能。因此,支持大于64KB的消息只是为了考虑业务偶尔传输少量大消息且不想做消息分片的应用场景。

一般来说,64KB的消息限制大小基本能满足大部分业务场景需求了,但在某些特殊场景下,消息数据大于64KB甚至大于1MB时,业务和CMQ如何支持这种超大消息的传输呢?这里有两种解决方案:

1.消息分片。类似IP数据包分片传输原理,生产者对消息分片标记后分别发送到队列,消费者从队列取出所有分片消息进行组装。个人方案如下:

  • 每个消息body分为header和data两部分。其中,data就是原消息分片后的内容,header包含三个标记:业务指定消息的ID号,唯一记录一个消息的ID值,具有同一个ID号的消息分片才会在消费端重新组装;分片序号(从1开始),记录一个消息分片的次序编号,消费端依据分片序号依次组装消息;下一分片是否存在的标记,如果是,说明消息包还不完整,否则消息组装完毕。
  • 由于可能存在多个消费者client,不同分片可能被不同client接收到,为了能够组装分片,需要一个集中式的地方存储所有分片并最终组装成完整的消息包,但无疑大大增加了系统设计的复杂度。

2.COS代理存储(COS是腾讯云的对象存储服务)。类似编程中的指针原理,方案如下(具体代码实现参考附件):

  • 生产者先把超大消息的数据以文件形式上传到COS,并返回消息文件的COS URL地址;
  • 生产者将URL地址作为消息发送到CMQ队列中;
  • 消费者从CMQ队列中读取消息,判断消息内容是否是COS的URL地址信息,如果是,则根据URL地址从COS下载相应的消息文件,并从文件中读取出超大消息的数据。

七、消息加密传输

腾讯云提供秘钥管理服务KMS,能对数据进行安全加密。CMQ消息加密功能有以下两种方案:

1.CMQ SDK客户端加密方案。客户端发送消息时,根据设置的CMK(KMS的秘钥ID)调用KMS生成数据秘钥接口,会返回数据秘钥的明文key以及加密后的密文key,使用明文key对消息进行本地加密,然后将加密的数据和密文key作为消息 发送给CMQ;消费者接收消息时,先获取消息中的密文key,调用KMS接口解密(不必每次均调用,可做缓存)得到对应的明文key,最后根据明文key本地解密密文数据即可。具体代码实现参考附件。

2.CMQ服务端加密方案。该方案,由CMQ服务端和KMS服务打通,CMQ自动对消息加解密,用户无感知,例如,用户通过https接口发送消息,由CMQ自动加密后存储,通过https接口接收消息时,CMQ对消息自动解密后返回给用户。此功能正在开发中。

结语

CMQ更多功能正在开发中,例如,死信队列、FIFO顺序消息等,欢迎体验:)

附件:

qc_cmq_python_sdk_sample.zip

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Maomi.MQ 2.0 | 功能强大的 .NET 消息队列通讯模型框架
Maomi.MQ 是一个简化了消息队列使用方式的通讯框架,目前支持了 RabbitMQ。
郑子铭
2025/04/06
3520
Maomi.MQ 2.0 | 功能强大的 .NET 消息队列通讯模型框架
10分钟搞懂!消息队列选型全方位对比
导语 | 消息队列是分布式系统中重要的中间件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。本文对Kafka、Pulsar、RocketMQ、RabbitMQ、NSQ这几个消息队列组件进行了一些调研,并整理了相关资料,为业务对MQ中间件选型提供参考。 一、概述 消息队列是分布式系统中重要的中间件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。分布式系统可以借助消息队列的能力,轻松实现以下功能: 解耦,将一个流程的上游和下游拆开,上游专注生产消息,下游专注处理消息。 广播,一个上游生产的消息轻松被
腾讯云开发者
2022/02/17
14.8K0
消息队列(二):AMQP
AMQP(Advanced Message Queuing Protocol),叫做高级消息队列协议:一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
灰子学技术
2023/10/30
1K0
消息队列(二):AMQP
高并发系列:架构优化之从BAT实际案例看消息中间件的妙用
说到Java中的队列应该都不会陌生。其具有通过先进先出,或者双端进出的方式进行数据管理;通过阻塞以达到自动平衡负载的功能。
Coder的技术之路
2021/05/14
8960
高并发系列:架构优化之从BAT实际案例看消息中间件的妙用
Redis 应用实践-消息队列
Redis是一个功能强大的内存缓存系统,同时也支持一些高级功能,例如发布/订阅、事务、Lua脚本等。其中,Redis也可以作为消息队列使用,以支持异步处理和解耦系统组件。
玖叁叁
2023/04/15
4430
Java消息队列--JMS概述
1、什么是JMS  JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(百度百科给出的概述)。我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合。 2、JMS的优势   Asynch
九灵
2018/03/09
2.4K0
Java消息队列--JMS概述
深入了解消息队列:揭示消息队列的概念、原理以及应用场景
消息队列 (Message Queue, MQ) 用于实现服务之间的异步通信、服务解耦、流量控制、发布订阅、高并发缓冲等。
Lion 莱恩呀
2025/04/19
4510
深入了解消息队列:揭示消息队列的概念、原理以及应用场景
如何打造高可靠高性能的消息队列(ZZMQ)
互联网公司使用最频繁的服务调用组件是RPC框架,RPC同步调用有些场景并不是很适用,而这些场景刚好是一个可靠MQ的适用场景。
玄姐谈AGI
2018/07/03
2K1
【实践】消息队列RabbitMQ从入门安装到精通原理
从安装环境,配置入门,到HelloWorld实操,各种类型消息传递的演示代码,原理介绍,答疑解惑,面试题,全面介绍RabbitMQ消息队列。 RabbitMQ集群搭建另外一篇文章介绍。
辉哥
2021/01/29
1.3K0
消息队列及常见消息队列介绍
曾令武
2017/09/29
51.8K6
消息队列及常见消息队列介绍
Maomi.MQ 2.0 | 功能强大的 .NET 消息队列通讯模型框架
在本篇教程中,将介绍 Maomi.MQ.RabbitMQ 的使用方法,以便读者能够快速了解该框架的使用方式和特点。
痴者工良
2025/03/26
2610
Maomi.MQ 2.0 | 功能强大的 .NET 消息队列通讯模型框架
高性能消息队列 CKafka 核心原理介绍(上)
本文介绍了高性能消息队列CKafka的核心原理,包括设计初衷、核心概念、关键组件、工作流程、性能指标、应用场景以及消费者和生产者如何协同工作。CKafka通过集群部署、消息有序处理、可扩展的数据结构等方式实现高性能,支持大量消费者的同时维持低延迟,并通过简单的配置和API实现高效的生产和消费。
腾讯云中间件团队
2017/08/17
3.9K0
高性能消息队列 CKafka 核心原理介绍(上)
【消息队列 MQ 专栏】消息队列之 RocketMQ
RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。其主要特点有:
芋道源码
2018/07/31
6.7K0
【消息队列 MQ 专栏】消息队列之 RocketMQ
几种 MQ 顺序消息的实现方式
•中间件 excerpt: 常见的 MQ(包括:kafka、pulsar、rocketmq 和 rabbitmq 分别是如何实现顺序消息的呢。banner_img: >- https://pic-cdn.ewhisper.cn/img/2021/07/31/f463dc14089f025621400cb73b78e441-kafka-logo-long.png index_img: >- https://pic-cdn.ewhisper.cn/img/2021/07/31/3070fecc79d30db8e4c0135a36a2ac89-kafka-logo-tall.png abbrlink: 60020 date: 2021-10-01 16:37:31
东风微鸣
2022/04/22
2K0
几种 MQ 顺序消息的实现方式
RocketMQ消息队列的最佳实践
tags可由应用自行设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:
JavaEdge
2021/10/18
6080
消息队列kafka
在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。
超蛋lhy
2019/04/19
1.2K0
RocketMQ深入浅出-02-详细介绍与安装
RocketMQ是一个统一消息引擎、轻量级数据处理平台。 RocketMQ是⼀款阿⾥巴巴开源的消息中间件。2016年11⽉28⽇,阿⾥巴巴向 Apache 软件基⾦会捐赠RocketMQ,成为 Apache 孵化项⽬。2017 年 9 ⽉ 25 ⽇,Apache 宣布 RocketMQ孵化成为 Apache 顶级项⽬(TLP ),成为国内⾸个互联⽹中间件在 Apache 上的顶级项⽬。它使用Java语言开发,在阿里内部,RocketMQ承接了例如“双11”等高并发场景的消息流转,能够处理万亿级别的消息。
编程大道
2021/10/28
8880
RocketMQ深入浅出-02-详细介绍与安装
消息队列基本原理和选型对比
作者:anncdchen,腾讯 PCG 后台开发工程师 消息队列使用场景 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,削峰填谷等问题。实现高性能、高可用、可伸缩和最终一致性架构。 解耦:多个服务监听、处理同一条消息,避免多次 rpc 调用。 异步消息:消息发布者不用等待消息处理的的结果。 削峰填谷:较大流量、写入场景,为下游 I/O 服务抗流量。当然大流量下就需要使用其他方案了。 消息驱动框架:在事件总线中,服务通过监听事件消息驱动服务完成相应动作。 消息队列模式 点对点模
腾讯技术工程官方号
2022/08/26
1.2K0
消息队列基本原理和选型对比
消息队列基本概念与pulsar学习
Pub-sub架构(发布/订阅),异步的服务间通信方式,适用于无服务器和微服务。发布到主题的任何消息都会立即被主题的所有订阅者接收。
千灵域
2022/06/17
4640
【进阶之路】消息队列——RocketMQ原理(三)
.markdown-body{word-break:break-word;line-height:1.75;font-weight:400;font-size:15px;overflow-x:hidden;color:#333}.markdown-body h1,.markdown-body h2,.markdown-body h3,.markdown-body h4,.markdown-body h5,.markdown-body h6{line-height:1.5;margin-top:35px;margin-bottom:10px;padding-bottom:5px}.markdown-body h1{font-size:30px;margin-bottom:5px}.markdown-body h2{padding-bottom:12px;font-size:24px;border-bottom:1px solid #ececec}.markdown-body h3{font-size:18px;padding-bottom:0}.markdown-body h4{font-size:16px}.markdown-body h5{font-size:15px}.markdown-body h6{margin-top:5px}.markdown-body p{line-height:inherit;margin-top:22px;margin-bottom:22px}.markdown-body img{max-width:100%}.markdown-body hr{border:none;border-top:1px solid #ddd;margin-top:32px;margin-bottom:32px}.markdown-body code{word-break:break-word;border-radius:2px;overflow-x:auto;background-color:#fff5f5;color:#ff502c;font-size:.87em;padding:.065em .4em}.markdown-body code,.markdown-body pre{font-family:Menlo,Monaco,Consolas,Courier New,monospace}.markdown-body pre{overflow:auto;position:relative;line-height:1.75}.markdown-body pre>code{font-size:12px;padding:15px 12px;margin:0;word-break:normal;display:block;overflow-x:auto;color:#333;background:#f8f8f8}.markdown-body a{text-decoration:none;color:#0269c8;border-bottom:1px solid #d1e9ff}.markdown-body a:active,.markdown-body a:hover{color:#275b8c}.markdown-body table{display:inline-block!important;font-size:12px;width:auto;max-width:100%;overflow:auto;border:1px solid #f6f6f6}.markdown-body thead{background:#f6f6f6;color:#000;text-align:left}.markdown-body tr:nth-child(2n){background-color:#fcfcfc}.markdown-body td,.markdown-body th{padding:12px 7px;line-height:24px}.markdown-body td{min-width:120px}.markdown-body blockquote{color:#666;padding:1px 23px;margin:22px 0;border-left:4px solid #cbcbcb;background-color:#f8f8f8}.markdown-body blockquote:after{display:block;content:""}.markdown-body blockquote>p{margin:10px 0}.markdown-body ol,.markdown-body ul{padding-left:28px}.markdown-body ol li,.markdown-body
南橘
2021/04/02
4660
【进阶之路】消息队列——RocketMQ原理(三)
相关推荐
Maomi.MQ 2.0 | 功能强大的 .NET 消息队列通讯模型框架
更多 >
LV.3
TencentProduct Manager
作者相关精选
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档