Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >分布式消息中间件TDMQ架构及使用案例最佳实践

分布式消息中间件TDMQ架构及使用案例最佳实践

原创
作者头像
邓愉悦
修改于 2020-10-26 08:24:34
修改于 2020-10-26 08:24:34
2K0
举报

背景

TDMQ是基于pulsar的金融级分布式消息中间件,是一个具备跨域、高可用、高并发的MQ。拥有原生的java、C++,Python,Go API,同时支持多种协议的接入(kafka、AMQP等)。同时支持 Kafka 协议以及 HTTP Proxy 方式接入,可为分布式应用系统提供异步解耦和削峰填谷的能力,具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。目前TDMQ已逐步成为新一代分布式云上消息中间件。能够很好的兼容和满足客户丰富的业务场景。

TDMQ的功能全景图
TDMQ的功能全景图

TDMQ的整体架构如下:

TDMQ整体架构
TDMQ整体架构

从上面TDMQ的功能全景和架构图中,可以看出,整个TDMQ是采用服务与存储相分离的架构。同时服务都是无状态的,这样的话,可以方便随时进行扩展,同时提升了整体的容灾能力。

TDMQ各组件说明

topic 介绍

topic是所有消息的集合,所有生产者的消息,都会归属到指定的topic之中, 所有在 topic 里的消息,会按照一定的规则,被切分成不同的分区(Partition)。一个分区会落靠在某一个服务器上,原理类似于 Kafka Topic Partition。

topic的架构如下所示:

topic逻辑架构图
topic逻辑架构图

topic的命名规则如下:Topic完整名称由:租户名 + 命名空间 + Topic。其中租户名为APPID,命名空间为环境变量。

topic完整命名组成
topic完整命名组成

broker 介绍

broker端架构
broker端架构

Broker负责消息的收发,数据不会真正存储在 broker,但会分配topic的控制权。

客户端实践

1、客户端初始化,创建Client(java语言为例)

(1)、使用域名的访问模式:

Map<String, String> authParams = new HashMap<>();

authParams.put("secretId", "***");

authParams.put("secretKey", "***");

authParams.put("region", "ap-guangzhou");//地域信息

PulsarClient client = PulsarClient.builder().authenticationCloud(

"com.tencent.tdmq.client.impl.auth.AuthenticationCloudCam", authParams)

.serviceUrl("pulsar://tdmq.åtencentcloud.example.com:6650").build();

(2)、用多个broker ip地址方式

Map<String, String> authParams = new HashMap<>();

authParams.put("secretId", "***********************************************");

authParams.put("secretKey", "***********************************************");

authParams.put("region", "ap-guangzhou");//地域信息

authParams.put("apiUrl", "");//腾讯云CAM地址

PulsarClient client = PulsarClient.builder().authenticationCloud(

"com.tencent.tdmq.client.impl.auth.AuthenticationCloudCam", authParams)

.serviceUrl("pulsar://host1:6650,host2:6650").build();

(3)、根据不同的网络环境,使用不同的netModel模式方法。

Map<String, String> authParams = new HashMap<>();

authParams.put("secretId", "***********************************************");

authParams.put("secretKey", "***********************************************");

authParams.put("region", "ap-guangzhou");//地域信息

PulsarClient client = PulsarClient.builder().authenticationCloud(

"com.tencent.tdmq.client.impl.auth.AuthenticationCloudCam", authParams)

.netModelKey("customNetModelKey")

.serviceUrl("pulsar://host1:6650,host2:6650").build();

(4)、生产消费的例子

//创建生产者对象

Producer<byte[]> producer = client.newProducer()

.batchingMaxBytes(1024*32)

.batchingMaxMessages(1000)

.topic(topic)

.create();

for (int i = 0; i < 5; i++) {

String value = "my-sync-message-" + i;

MessageId msgId = producer.newMessage().value(value.getBytes()).send();

System.out.println("produce sync msg id:" + msgId + ", value:" + value);

}

//创建消费者对象

Consumer<byte[]> consumer = client.newConsumer()

.topic(topic)

.subscriptionName(groupName)

.subscribe();

for (int i = 0; i < 5; i++) {

Message<byte[]> msg = consumer.receive();

String msgId = ((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId().toString();

String value = new String(msg.getValue());

System.out.println("receive msg " + msgId + ",value:" + value );

consumer.acknowledge(msg);// 确认消息

}

TDMQ消息的订阅模式

TDMQ支持3中订阅模式:独占模式、共享模式、故障转移定位模式。三种区别如图所示:

TDMQ3种订阅模式
TDMQ3种订阅模式

1、独占模式

对于一个topic来说,不管有多少个Consumer 同时存在,只会有一个Consumer是活跃的,也就是说只有一个Consumer能够收到这个topic下面的所有消息,这种模式就是Pulsar订阅模式中的独占订阅(Exclusive)。

Consumer consumer = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

如果多个consumer去订阅这个topic,就好出现报错。

大于一个实例独占订阅topic
大于一个实例独占订阅topic

2、故障转移订阅

Failover(故障转移订阅)则是多个 consumer 可以附加到同一订阅。但是,对于给定的主题分区,将选择一个 consumer 作为该主题分区的主使用者,其他 consumer 将被指定为故障转移消费者,当主消费者断开连接时,分区将被重新分配给其中一个故障转移消费者,而新分配的消费者将成为新的主消费者。发生这种情况时,所有未确认的消息都将传递给新的主消费者,这类似于 Apache Kafka 中的使用者分区重新平衡。

Consumer consumer = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Failover)

.subscribe();

3、共享订阅

是可以将所需数量的 consumer 附加到同一订阅。消息以多个 consumer 的循环尝试分发形式传递,并且任何给定的消息仅传递给一个 consumer。当消费者断开连接时,所有传递给它并且未被确认的消息将被重新安排,以便发送给该订阅上剩余的 consumer。需要指出的是,TDMQ对consumer数量没有明确的限制。

Consumer consumer = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Shared)

.subscribe();

TDMQ高级特性

1、顺序消息

(1)、全局有序必须为非分区Topic

//只能允许一个消费者

Consumer<byte[]> consumer = client.newConsumer()

.subscriptionType(SubscriptionType.Exclusive)

.topic(topic)

.subscriptionName(groupName)

.subscribe();

(2)、局部有序通过设置key 来达到局部顺序的目的

//设置key,保证相同可以的消息发送到同一个分区里,只能允许一个消费者

MessageId msgId = producer.newMessage().key(key).value(value.getBytes()).send();

2、执行tag消息过滤

//单个tag生产

MessageId msgId = producer.newMessage().value(value.getBytes()) .tags("TagA").send();

//多个tag生产

producer.newMessage()

.value("my-sync-message".getBytes())

.tags("TagA", "TagB","TagC")//支持设置多个标签

.send();

//指定tag进行消费

Consumer consumer = client.newConsumer()

.topicByTag(topic, "TagA || TagB")

//.topicByTag(topic, "TagA ") 单个

//.topic(topic, "*") 订阅所有

//.topicByTagsPattern(topic, "Tag.*") 正则

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Shared)

.subscribe();

3、延时消息

//延迟

MessageId msgId = producer.newMessage().value(value.getBytes())

.deliverAfter(delay, TimeUnit.SECONDS)

.send();

//定时

MessageId msgId = producer.newMessage().value(value.getBytes())

.deliverAt(timestamp)

.send();

4、消息重试

Consumer<byte[]> consumer = client.newConsumer()

.subscriptionType(SubscriptionType.Shared)

.enableRetry(true)

.deadLetterPolicy(DeadLetterPolicy.builder()

.maxRedeliverCount(maxRedeliverCount)

.build())

.topic(topic)

.subscriptionName(groupName)

.subscribe();

//指定延迟时间

consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);

//指定延迟等级

consumer.reconsumeLater(msg, 1);

//等级递增

consumer.reconsumeLater(msg);

5、消息压缩,//生产者设置压缩类型,支持ZLIB、ZSTD、SNAPPY、LZ4

//生产者设置压缩类型,支持ZLIB、ZSTD、SNAPPY、LZ4

Producer<byte[]> producer = client.newProducer()

.enableBatching(false)

.topic(topic)

.compressionType(CompressionType.LZ4)

.create();

6、批量发生消息

//可以设置批量的大小和消息条数

Producer<byte[]> producer = client.newProducer()

.enableBatching(true)

.batchingMaxBytes(1024*32)

.batchingMaxMessages(1000)

.topic(topic)

.create();

以上就是整个TDMQ架构,产品特性,以及客户端初始化以及消费、生产主要核心流程代码,希望对大家有帮助。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
SpringBoot整合分布式消息平台Pulsar
作为优秀的消息流平台,Pulsar 的使用越来越多,这篇文章讲解 Pulsar 的 Java 客户端。
jinjunzhu
2022/09/23
7410
SpringBoot整合分布式消息平台Pulsar
基于腾讯云tdmq消息队列封装SpringBootStarter(一)
创建好集群后,在命名空间中新建命名空间,命名空间名称可以根据实际业务场景进行区分,比如这里创建可以根据测试环境、预发布环境、生产环境等进行区分创建。
JulyWhj
2022/01/03
3K0
手把手教学--从Pulsar到TDMQ
导语:介于TDMQ还没有公网的访问功能,不可能买台CVM安装windows吧,VPN又只能支持协议类型: IKE/IPsec,意思是企业用户才能用,对于个人就只能再想办法了,但办法总比问题多。本地开发测试环境使用pulsar的单机版,生产使用TDMQ,这样怎么样,一起来看看怎么配置。
沐榕樰
2020/12/03
1.9K0
基于腾讯云tdmq消息队列封装SpringBootStarter(二)
关于腾讯云tdmq的基本使用参见《基于腾讯云tdmq消息队列封装SpringBootStarter(一)》,这里我们基于之前的内容在次进行优化封装。
JulyWhj
2022/01/03
3.4K0
号称下一代消息中间件!来看看它有多牛逼
最近这个 Apache Pulsar 消息中间件非常的火,号称下一代消息中件,今天,就一起来看看它到底有多牛逼?
民工哥
2021/07/16
5530
号称下一代消息中间件!来看看它有多牛逼
200 行代码告诉你 TDMQ 中 Pulsar 广播如何实现
导读 Pulsar 作为 Apache 社区的相对新的成员,在业界受到非常大量的关注。新产品的文档相对不齐全也是非常能够理解的。今天客户问过来广播怎么实现的,我解释了半天,又找了很多介绍产品的 PPT,最终也没有找到“官方”的文档说明这个事情。于是我就写了这篇文章,方便大家 copy/paste 。 作者介绍 徐为 腾讯云微服务团队高级解决方案构架师 毕业于欧盟 Erasmus Mundus IMMIT,获得经济和IT管理硕士学位 自2006年以来,曾就职于SonyEricsson、SAP等
腾讯云中间件团队
2021/04/01
2K0
【最佳实践】巡检项:消费者创建与健康检查
在 TDMQ Pulsar 版控制台中,订阅代表一个具体的消费者以及其对某个 Topic 的订阅关系。当一个消费者订阅了某个 Topic 之后,则该 Topic 下的消息均可以被其消费。一个订阅可以订阅多个 Topic ,例如用户在一个 Topic 下创建了一个订阅后,其不仅会订阅当前的 Topic,还会订阅系统自动创建的重试队列 Topic。
邓愉悦
2022/03/29
5880
博文推荐|整合 Spring 与 Pulsar,在 Java 中构建微服务
本文我们来探讨如何在 Java 框架——Spring 中整合 Apache Pulsar。文章阐述如何在 Java 中构建基于 Spring 的微服务。在正文内容开始前,我们先介绍 Spring。Spring 是 Java 生态中鼎鼎有名的技术框架,自诞生已有近 20 年历史。Spring 提供了极为方便的装配与控制机制,极大地降低了构建应用的难度。有了 Spring,开发者无需堆砌非业务相关的重复模板代码。基于 Spring,开发者可以如鱼得水般快速开发微服务应用,包括各类 REST API、Web 应用程序、控制台应用程序等。推荐大家深入研究 Spring。
从大数据到人工智能
2022/08/30
1.4K0
博文推荐|整合 Spring 与 Pulsar,在 Java 中构建微服务
究极缝合怪 | Pulsar核心概念和特性解读
Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。
王知无-import_bigdata
2022/03/11
2.1K0
究极缝合怪 | Pulsar核心概念和特性解读
Pulsar与Kafka消费模型对比
kafka 属于 Stream 的消费模型,为了支持多 partition 的消费关系,引入了 consumer group 的概念,同时支持在消费端动态的 reblance 操作,当多个 Consumer 订阅了同一个 Topic 时,会根据分区策略进行消费者订阅分区的重分配。只要 consumer-group 与 topic 之间的关系发生变更,就会动态触发 reblance 操作,诸如:
王知无-import_bigdata
2019/06/20
2.9K0
Pulsar与Kafka消费模型对比
【最佳实践】巡检项:死信队列检查
死信队列是一种特殊的消息队列,用于集中处理无法被正常消费的消息的队列。当消息在重试队列中达到一定重试次数后仍未能被正常消费,TDMQ Pulsar 版会判定这条消息在当前情况下无法被消费,将其投递至死信队列。
邓愉悦
2022/03/29
3390
Apache pulsar 技术系列-- 消息重推的几种方式
Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO replication)、快速扩容、灵活容错等特性。在很多场景下,用户需要通过 MQ 实现消息的重新推送能力,比如超时重推、处理异常时重推等,本文介绍 Apache Pulsar 提供的几种消息重推方案。
腾讯云中间件团队
2023/08/03
9450
Apache pulsar 技术系列-- 消息重推的几种方式
Pulsar入门学习手册
Pulsar入门学习手册 https://cloud.tencent.com/developer/article/2276612?shareByChannel=link 前言 Apache Pulsa
疯狂的KK
2023/07/14
1.4K0
Pulsar入门学习手册
Pulsar 技术系列 - 深度解读Pulsar Schema
导读 Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制、快速扩容、灵活容错等特性。数平MQ团队对 Pulsar 做了深入调研以及大量的性能和稳定性方面的优化。本文是Pulsar技术系列中的一篇,主要介绍Pulsar Schema。 为什么使用Pulsar Schema 如果 producer 端要发送 POJO 类型的数据,则 Pulsar 需要一套序列化和反序列化工具,先将对象转化为字节数据再发送出去,下面为有无 schem
腾讯云中间件团队
2021/07/12
3.3K0
Apache Pulsar 技术系列 - Pulsar事务实现原理
导语 Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制、快速扩容、灵活容错等特性。腾讯云MQ Oteam Pulsar工作组对 Pulsar 做了深入调研以及大量的性能和稳定性方面优化,目前已经在TDBank、腾讯云TDMQ落地上线。本篇将简单介绍Pulsar服务端消息确认的一些概念和原理,欢迎大家阅读。 作者简介  林琳                                                           
腾讯云中间件团队
2022/03/03
2.1K0
mac 上学习k8s系列(51)延迟队列pulsar
kafka不支持延迟消息,rocketmq支持的延迟消息粒度有限,pulsar(https://github.com/apache/pulsar)采用优先队列的方式实现,支持任意粒度的延迟消息,不过,对于大量延迟比较久的消息,内存消耗会比较严重。本文学习下如何在mac上搭建pulsar,并通过go 的sdk实现消息的发布和订阅。
golangLeetcode
2022/12/17
4890
【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)
通过简单代码demo进行讲解,pulsar在java中如何使用?如何通过pulsar进行异步解耦?......等
JavaDog程序狗
2024/09/19
3110
【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)
为什么放弃Kafka,选择Pulsar?
Pulsar 是 Yahoo 在 2013 年创建的,2016年贡献给了 Apache 基金会,目前已经是 Apache 的顶级项目。Yahoo、Verizon、Twitter 等很多公司都在使用 Pulsar 来处理海量消息。
微观技术
2021/04/30
1.1K0
为什么放弃Kafka,选择Pulsar?
Pulsar中间件入门学习
Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。最初是由 Yahoo 开发,目前由 Apache 软件基金会 管理。是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。
java技术爱好者
2022/04/18
7200
Pulsar中间件入门学习
RocketMq的使用demo
生产者和消费者都属于MQ的客户端,都继承于ClientConfig类,ClientConfig为客户端的公共配置类。这里将客户端相关配置信息写在最前面,大家可以看了就知道大概由哪些属性了,客户端配置
名字是乱打的
2021/12/24
1.1K0
RocketMq的使用demo
推荐阅读
相关推荐
SpringBoot整合分布式消息平台Pulsar
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档