TDMQ是基于pulsar的金融级分布式消息中间件,是一个具备跨域、高可用、高并发的MQ。拥有原生的java、C++,Python,Go API,同时支持多种协议的接入(kafka、AMQP等)。同时支持 Kafka 协议以及 HTTP Proxy 方式接入,可为分布式应用系统提供异步解耦和削峰填谷的能力,具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。目前TDMQ已逐步成为新一代分布式云上消息中间件。能够很好的兼容和满足客户丰富的业务场景。
TDMQ的整体架构如下:
从上面TDMQ的功能全景和架构图中,可以看出,整个TDMQ是采用服务与存储相分离的架构。同时服务都是无状态的,这样的话,可以方便随时进行扩展,同时提升了整体的容灾能力。
topic是所有消息的集合,所有生产者的消息,都会归属到指定的topic之中, 所有在 topic 里的消息,会按照一定的规则,被切分成不同的分区(Partition)。一个分区会落靠在某一个服务器上,原理类似于 Kafka Topic Partition。
topic的架构如下所示:
topic的命名规则如下:Topic完整名称由:租户名 + 命名空间 + Topic。其中租户名为APPID,命名空间为环境变量。
Broker负责消息的收发,数据不会真正存储在 broker,但会分配topic的控制权。
(1)、使用域名的访问模式:
Map<String, String> authParams = new HashMap<>();
authParams.put("secretId", "***");
authParams.put("secretKey", "***");
authParams.put("region", "ap-guangzhou");//地域信息
PulsarClient client = PulsarClient.builder().authenticationCloud(
"com.tencent.tdmq.client.impl.auth.AuthenticationCloudCam", authParams)
.serviceUrl("pulsar://tdmq.åtencentcloud.example.com:6650").build();
(2)、用多个broker ip地址方式
Map<String, String> authParams = new HashMap<>();
authParams.put("secretId", "***********************************************");
authParams.put("secretKey", "***********************************************");
authParams.put("region", "ap-guangzhou");//地域信息
authParams.put("apiUrl", "");//腾讯云CAM地址
PulsarClient client = PulsarClient.builder().authenticationCloud(
"com.tencent.tdmq.client.impl.auth.AuthenticationCloudCam", authParams)
.serviceUrl("pulsar://host1:6650,host2:6650").build();
(3)、根据不同的网络环境,使用不同的netModel模式方法。
Map<String, String> authParams = new HashMap<>();
authParams.put("secretId", "***********************************************");
authParams.put("secretKey", "***********************************************");
authParams.put("region", "ap-guangzhou");//地域信息
PulsarClient client = PulsarClient.builder().authenticationCloud(
"com.tencent.tdmq.client.impl.auth.AuthenticationCloudCam", authParams)
.netModelKey("customNetModelKey")
.serviceUrl("pulsar://host1:6650,host2:6650").build();
(4)、生产消费的例子
//创建生产者对象
Producer<byte[]> producer = client.newProducer()
.batchingMaxBytes(1024*32)
.batchingMaxMessages(1000)
.topic(topic)
.create();
for (int i = 0; i < 5; i++) {
String value = "my-sync-message-" + i;
MessageId msgId = producer.newMessage().value(value.getBytes()).send();
System.out.println("produce sync msg id:" + msgId + ", value:" + value);
}
//创建消费者对象
Consumer<byte[]> consumer = client.newConsumer()
.topic(topic)
.subscriptionName(groupName)
.subscribe();
for (int i = 0; i < 5; i++) {
Message<byte[]> msg = consumer.receive();
String msgId = ((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId().toString();
String value = new String(msg.getValue());
System.out.println("receive msg " + msgId + ",value:" + value );
consumer.acknowledge(msg);// 确认消息
}
TDMQ支持3中订阅模式:独占模式、共享模式、故障转移定位模式。三种区别如图所示:
1、独占模式
对于一个topic来说,不管有多少个Consumer 同时存在,只会有一个Consumer是活跃的,也就是说只有一个Consumer能够收到这个topic下面的所有消息,这种模式就是Pulsar订阅模式中的独占订阅(Exclusive)。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
如果多个consumer去订阅这个topic,就好出现报错。
2、故障转移订阅
Failover(故障转移订阅)则是多个 consumer 可以附加到同一订阅。但是,对于给定的主题分区,将选择一个 consumer 作为该主题分区的主使用者,其他 consumer 将被指定为故障转移消费者,当主消费者断开连接时,分区将被重新分配给其中一个故障转移消费者,而新分配的消费者将成为新的主消费者。发生这种情况时,所有未确认的消息都将传递给新的主消费者,这类似于 Apache Kafka 中的使用者分区重新平衡。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe();
3、共享订阅
是可以将所需数量的 consumer 附加到同一订阅。消息以多个 consumer 的循环尝试分发形式传递,并且任何给定的消息仅传递给一个 consumer。当消费者断开连接时,所有传递给它并且未被确认的消息将被重新安排,以便发送给该订阅上剩余的 consumer。需要指出的是,TDMQ对consumer数量没有明确的限制。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
1、顺序消息
(1)、全局有序必须为非分区Topic
//只能允许一个消费者
Consumer<byte[]> consumer = client.newConsumer()
.subscriptionType(SubscriptionType.Exclusive)
.topic(topic)
.subscriptionName(groupName)
.subscribe();
(2)、局部有序通过设置key 来达到局部顺序的目的
//设置key,保证相同可以的消息发送到同一个分区里,只能允许一个消费者
MessageId msgId = producer.newMessage().key(key).value(value.getBytes()).send();
2、执行tag消息过滤
//单个tag生产
MessageId msgId = producer.newMessage().value(value.getBytes()) .tags("TagA").send();
//多个tag生产
producer.newMessage()
.value("my-sync-message".getBytes())
.tags("TagA", "TagB","TagC")//支持设置多个标签
.send();
//指定tag进行消费
Consumer consumer = client.newConsumer()
.topicByTag(topic, "TagA || TagB")
//.topicByTag(topic, "TagA ") 单个
//.topic(topic, "*") 订阅所有
//.topicByTagsPattern(topic, "Tag.*") 正则
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
3、延时消息
//延迟
MessageId msgId = producer.newMessage().value(value.getBytes())
.deliverAfter(delay, TimeUnit.SECONDS)
.send();
//定时
MessageId msgId = producer.newMessage().value(value.getBytes())
.deliverAt(timestamp)
.send();
4、消息重试
Consumer<byte[]> consumer = client.newConsumer()
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliverCount)
.build())
.topic(topic)
.subscriptionName(groupName)
.subscribe();
//指定延迟时间
consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
//指定延迟等级
consumer.reconsumeLater(msg, 1);
//等级递增
consumer.reconsumeLater(msg);
5、消息压缩,//生产者设置压缩类型,支持ZLIB、ZSTD、SNAPPY、LZ4
//生产者设置压缩类型,支持ZLIB、ZSTD、SNAPPY、LZ4
Producer<byte[]> producer = client.newProducer()
.enableBatching(false)
.topic(topic)
.compressionType(CompressionType.LZ4)
.create();
6、批量发生消息
//可以设置批量的大小和消息条数
Producer<byte[]> producer = client.newProducer()
.enableBatching(true)
.batchingMaxBytes(1024*32)
.batchingMaxMessages(1000)
.topic(topic)
.create();
以上就是整个TDMQ架构,产品特性,以及客户端初始化以及消费、生产主要核心流程代码,希望对大家有帮助。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有