Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO replication)、快速扩容、灵活容错等特性。在很多场景下,用户需要使用到延迟消息,本文是 Pulsar 技术系列中的一篇,主要介绍 Pulsar 3.x 大规模延迟消息投递的实现。
背景
之前有文章介绍过延迟消息的使用场景、使用方式以及实现原理,同时也提出了当时版本的局限性,完全基于内存构建延迟消息索引,导致无法支持大规模延迟消息场景。Pulsar 在 3.x 版本支持了基于磁盘的延迟消息索引方案,使得内存不再是延迟消息规模的瓶颈。
大规模延迟消息方案
Pulsar 消费流程
在讲解延迟消息之前,先简单描述一下 Pulsar 服务端消费处理流程。Broker 为每个订阅维护单独的 Dispatcher 对象,Dispatcher 负责管理整个订阅的消费。大致流程如下:
可以简单理解为,Dispatcher 持续往后读取数据,已过期延迟消息就和普通消息一样推送给客户端。其余延迟消息会被添加到 DelayedDeliveryTracker,数据到期后再推送给客户端。
DelayedDeliveryTracker
延迟消息是由 DelayedDeliveryTracker 管理,从上图中可以看到,DelayedDeliveryTracker 主要功能为添加消息以及读取已经到期的消息,代码片段如下:
public interface DelayedDeliveryTracker extends AutoCloseable {
# 添加消息
boolean addMessage(long ledgerId, long entryId, long deliveryAt);
# 获取到期消息
NavigableSet<PositionImpl> getScheduledMessages(int maxMessages);
# 其余为辅助方法
boolean hasMessageAvailable();
long getNumberOfDelayedMessages();
long getBufferMemoryUsage();
boolean shouldPauseAllDeliveries();
void resetTickTime(long tickTime);
CompletableFuture<Void> clear();
void close();
}
可以看到,DeliveryTracker 只添加 Position 信息,Data 信息是不需要保存的。目前具体的实现 Tracker 有两个,InMemoryDelayedDeliveryTracker 和 BucketDelayedDeliveryTracker,前者为完全基于内存的老版本实现,后者为基于 BK 存储支持超大规模延迟消息的实现,下文主要分析 BucketDelayedDeliveryTracker 的实现原理。
BucketDelayedDeliveryTracker
如何存储
Broker 设计为无状态服务,所以基于磁盘的 DelayedDeliveryTracker 是 Bookie 来负责。BucketDelayedDeliveryTracker 为了降低磁盘的写入次数(写入 Bookie),会积累到一定量延迟消息索引后再触发写入。可以看到 Bucket 生命周期和 Ledgers 是很类似的,只有最后一个 Bucket 支持写入(LastMutableBucket),前面的 Bucket 只支持读取(ImmutableBuckets),读取完成后会删除。由于 Ledger 是递增的,所以可以看到每个 Bucket 中 Ledger 也是递增的,每个 Bucket 负责存储一定范围的 Ledger 延迟消息。
数据加入 DeliveryTracker 具体流程如下:
当 LastMutableBucket 累计到一定量的延迟消息后(默认 5w,会存储完最后一个 Ledger 的全部延迟消息再切换),会触发刷盘, 具体步骤如下:
可以看到,整个 Bucket 通过几次 Append 写入就能把全部的延迟信息落盘,已写入的数据不能修改,只能删除。
如何读取
BucketDelayedDeliveryTracker 在 Addmsg 时已经把延迟消息写入到磁盘,内存中只会存储部分的延迟消息。在 BucketDelayedDeliveryTracker 中内存中的延迟消息都存储在 SharedBucketPriorityQueue(小堆实现的优先级队列)中。所有的延迟消息都通过 SharedBucketPriorityQueue 来获取。
读取相对来说比较简单, 大致流程如下:
何时删除
ImmutableBucket 被使用完后,在以下几个时机会被删除。
Bucket 数据丢失
如果延迟消息在 LastMutableBucket 中还没有刷盘到 Bookie,此时发生故障,LastMutableBucket 中的数据将会丢失。但是这并不会有什么影响,在重新启动后,依然会从 MackDelete 位置往后读取消息,重建 Bucket。这也就是为什么 Bucket 中的数据只需要被读走(不需要客户端 Ack)就可以被删除。
重建 Bucket
重建 Bucket 其余与第一次构建 Bucket 是一致的,都是往后读取消息,未过期的延迟消息重新加入 Bucket 中,已过期和延迟消息会被当做普通消息直接推送给客户端。重建 Bucket 过程资源消耗可控,也不会阻塞消费,暂时不必担心重建 Bucket 对消费造成额外压力。
+3
总结
Pulsar 3.x 版本大规模延迟消息方案整体比较简单,采用先分桶,再分段的策略,只在内存中保存最近的延迟消息,延迟消息规模将不再受到内存的限制。新版本实际压测下来与设计基本一致,内存占用达到稳定后将不会上涨,目前验证单节点数十亿延迟消息稳定运行。
未来规划
当前的实现版本 DelayedDeliveryTracker 是基于订阅维度,如果 Topic 下有很多订阅,占用的内存和磁盘存储会随着订阅数量等比例放大。目前这里有一定的优化空间,可以 Topic 下的多订阅共享 DeliveryTracker 存储,类似每个订阅有单独的 ReadPosition 即可。
往期推荐
《腾讯云 TDMQ 产品家族新成员:消息队列 MQTT 版全新发布!》
《图解Kafka:架构设计、消息可靠、数据持久、高性能背后的底层原理》
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有