Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Apache Pulsar 技术系列 - 大规模延迟消息解析

Apache Pulsar 技术系列 - 大规模延迟消息解析

作者头像
腾讯云中间件团队
发布于 2025-02-26 03:09:30
发布于 2025-02-26 03:09:30
15000
代码可运行
举报
运行总次数:0
代码可运行
导语

Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO replication)、快速扩容、灵活容错等特性。在很多场景下,用户需要使用到延迟消息,本文是 Pulsar 技术系列中的一篇,主要介绍 Pulsar 3.x 大规模延迟消息投递的实现。

背景

之前有文章介绍过延迟消息的使用场景、使用方式以及实现原理,同时也提出了当时版本的局限性,完全基于内存构建延迟消息索引,导致无法支持大规模延迟消息场景。Pulsar 在 3.x 版本支持了基于磁盘的延迟消息索引方案,使得内存不再是延迟消息规模的瓶颈。

大规模延迟消息方案

Pulsar 消费流程

在讲解延迟消息之前,先简单描述一下 Pulsar 服务端消费处理流程。Broker 为每个订阅维护单独的 Dispatcher 对象,Dispatcher 负责管理整个订阅的消费。大致流程如下:

  1. Dispatcher 启动后初始化 Markdelete(已 Ack 最小位置)和 ReadPosition(当前读位置)(起始时 ReadPosition 为 MarkDelete 的下一个位置)。
  2. 如果 RedeliveryTracker 中有数据,优先推送 RedeliveryTracker 中数据, 否则从 Bookie 中读取数据。
  3. 普通数据: 推送数据给合适的 Consumer,Consumer 未就绪时,拉取到的数据把索引信息存储到 RedeliveryTracker 中。
  4. 延迟消息: 如果时间到了,直接推送给用户,否则添加到 DelayMesageTracker 里。
  5. DelayMesageTracker 会定时把到期数据倒入 RedeliveryTracker。

可以简单理解为,Dispatcher 持续往后读取数据,已过期延迟消息就和普通消息一样推送给客户端。其余延迟消息会被添加到 DelayedDeliveryTracker,数据到期后再推送给客户端。

DelayedDeliveryTracker

延迟消息是由 DelayedDeliveryTracker 管理,从上图中可以看到,DelayedDeliveryTracker 主要功能为添加消息以及读取已经到期的消息,代码片段如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface DelayedDeliveryTracker extends AutoCloseable {
    # 添加消息
    boolean addMessage(long ledgerId, long entryId, long deliveryAt);
    # 获取到期消息
    NavigableSet<PositionImpl> getScheduledMessages(int maxMessages);
    
    # 其余为辅助方法
    boolean hasMessageAvailable();
    long getNumberOfDelayedMessages();
    long getBufferMemoryUsage();
    boolean shouldPauseAllDeliveries();
    void resetTickTime(long tickTime);
    CompletableFuture<Void> clear();
    void close();
}

可以看到,DeliveryTracker 只添加 Position 信息,Data 信息是不需要保存的。目前具体的实现 Tracker 有两个,InMemoryDelayedDeliveryTracker 和 BucketDelayedDeliveryTracker,前者为完全基于内存的老版本实现,后者为基于 BK 存储支持超大规模延迟消息的实现,下文主要分析 BucketDelayedDeliveryTracker 的实现原理。

BucketDelayedDeliveryTracker

如何存储

Broker 设计为无状态服务,所以基于磁盘的 DelayedDeliveryTracker 是 Bookie 来负责。BucketDelayedDeliveryTracker 为了降低磁盘的写入次数(写入 Bookie),会积累到一定量延迟消息索引后再触发写入。可以看到 Bucket 生命周期和 Ledgers 是很类似的,只有最后一个 Bucket 支持写入(LastMutableBucket),前面的 Bucket 只支持读取(ImmutableBuckets),读取完成后会删除。由于 Ledger 是递增的,所以可以看到每个 Bucket 中 Ledger 也是递增的,每个 Bucket 负责存储一定范围的 Ledger 延迟消息。

数据加入 DeliveryTracker 具体流程如下:

  1. 判断是否存在,如果存在了,直接返回(Bucket 内的 Metadata 有 Bitmap 快速标识哪些 Entry 是已存在的)。
  2. 如果需要加入的消息 LedgerId 不包含于 LastMutableBucket 中的话,说明之前的数据没有存储到(不应该出现)。直接放到 Share 池子里面。
  3. 其余正常情况会直接存储在 LastMutableBucket 中,LastMutableBucket 中的延迟消息累计到一定数量后会生成新的 ImmutableBucket 并刷入磁盘。
  4. 刷盘成功后,会清空 LastMutableBucket 数据重新接收新的延迟消息写入。

当 LastMutableBucket 累计到一定量的延迟消息后(默认 5w,会存储完最后一个 Ledger 的全部延迟消息再切换),会触发刷盘, 具体步骤如下:

  1. LastMutableBucket 内存中的延迟消息,按照时间(5min)、条数(5000)维度分成多个 Segment(简单理解第一个 Segment 是最新5分钟的消息,第二个是最近5~10分钟消息)。
  2. 生成好每个 Segement 的元数据(包含每个 Segement 中消息的最大和最小时间,以及用来快速判断某个消息是否是延迟消息的 Bitmap)。
  3. 创建一个新的 Ledger,并把元数据存储在对应 Cursor Property 中。格式为 <#pulsar.internal.delayed.bucket-
  4. 将元数据写入 Entry0 位置,其余 Segment 分别作为一个 Entry 顺序写入,并关闭 Ledger。

可以看到,整个 Bucket 通过几次 Append 写入就能把全部的延迟信息落盘,已写入的数据不能修改,只能删除。

如何读取

BucketDelayedDeliveryTracker 在 Addmsg 时已经把延迟消息写入到磁盘,内存中只会存储部分的延迟消息。在 BucketDelayedDeliveryTracker 中内存中的延迟消息都存储在 SharedBucketPriorityQueue(小堆实现的优先级队列)中。所有的延迟消息都通过 SharedBucketPriorityQueue 来获取。

读取相对来说比较简单, 大致流程如下:

  1. 每次拉取延迟消息时会先把 LastMutableBucket 中到期的延迟消息转移到 SharedBucketPriorityQueue 中。
  2. 在每个 ImmutableBucket 刷磁盘时,已经把第一个Segment 加载到 SharedBucketPriorityQueue 中。
  3. 当 ImmutableBucket 中的当前 Segement 中的最后一条消息被获取后,会触发从 Bookie 中加载下一个 Segement 存入 SharedBucketPriorityQueue。

何时删除

ImmutableBucket 被使用完后,在以下几个时机会被删除。

  1. Bucket 中的最后一个 Segement 的最后一条数据被读取后(实现上为加载下一个 Segment 发现未空时)。
  2. 订阅重新加载时(分区 Leadership 发生变化)时,如果 Bucket 中的延迟消息都已到期。
  3. Bucket 触发 Merger 后(可控制内存中的 Bucket 个数进而控制内存消耗),会删除原先的 Bucket。用户设置了 MaxNumBuckets,已存在 Bucket 个数大于这个配置值时,挑选若干个 Bucket 合并成一个 Bucket,并删除掉原有的 Bucket。
  4. 用户调用 org.apache.pulsar.client.admin.Topics#skipAllMessages。
  5. ImmutableBucket 删除时会先清空 ZK 中的元数据,再删除对应的 Ledger。

Bucket 数据丢失

如果延迟消息在 LastMutableBucket 中还没有刷盘到 Bookie,此时发生故障,LastMutableBucket 中的数据将会丢失。但是这并不会有什么影响,在重新启动后,依然会从 MackDelete 位置往后读取消息,重建 Bucket。这也就是为什么 Bucket 中的数据只需要被读走(不需要客户端 Ack)就可以被删除。

重建 Bucket

重建 Bucket 其余与第一次构建 Bucket 是一致的,都是往后读取消息,未过期的延迟消息重新加入 Bucket 中,已过期和延迟消息会被当做普通消息直接推送给客户端。重建 Bucket 过程资源消耗可控,也不会阻塞消费,暂时不必担心重建 Bucket 对消费造成额外压力。

+3

总结

Pulsar 3.x 版本大规模延迟消息方案整体比较简单,采用先分桶,再分段的策略,只在内存中保存最近的延迟消息,延迟消息规模将不再受到内存的限制。新版本实际压测下来与设计基本一致,内存占用达到稳定后将不会上涨,目前验证单节点数十亿延迟消息稳定运行。

未来规划

当前的实现版本 DelayedDeliveryTracker 是基于订阅维度,如果 Topic 下有很多订阅,占用的内存和磁盘存储会随着订阅数量等比例放大。目前这里有一定的优化空间,可以 Topic 下的多订阅共享 DeliveryTracker 存储,类似每个订阅有单独的 ReadPosition 即可。

往期推荐

《腾讯云 TDMQ 产品家族新成员:消息队列 MQTT 版全新发布!

猫眼在腾讯云北极星上的最佳实践

基于 RocketMQ 实现 AMQP 协议实践

图解Kafka:架构设计、消息可靠、数据持久、高性能背后的底层原理

腾讯云上基于 Apache Pulsar 的大规模生产实践

万字长文分享腾讯云原生微服务治理实践及企业落地建议

《基于 TDMQ for Apache Pulsar 的跨地域复制实践

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-02-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯云中间件 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
云原生消息流系统 Apache Pulsar 在腾讯云的大规模生产实践
导语 由 InfoQ 主办的 Qcon 全球软件开发者大会北京站上周已精彩落幕,腾讯云中间件团队的冉小龙参与了《云原生机构设计与音视频技术应用》专题,带来了以《云原生消息流系统 Apache Pulsar 在腾讯云的大规模生产实践》为主题的精彩演讲,在本篇内容中,将重点围绕腾讯云近期在 Apache Pulsar 稳定性和性能方面优化的工作展开介绍,为开发者提供参考。 作者简介 冉小龙 腾讯云高级研发工程师 Apache Pulsar committer RoP maintainer Apache Pul
腾讯云中间件团队
2023/02/16
1.2K0
云原生消息流系统 Apache Pulsar 在腾讯云的大规模生产实践
深入解析Apache Pulsar系列(二) —— Broker消息确认的管理
导语 我们在之前的《深入解析Apache Pulsar系列之一 —— 客户端消息确认》中介绍过Apache Pulsar客户端的多种消息确认模式。这篇文章中,我们将介绍Broker侧对于消息确认的管理。 作者简介 林琳 腾讯云中间件专家工程师 Apache Pulsar PMC,《深入解析Apache Pulsar》作者。目前专注于中间件领域,在消息队列和微服务方向具有丰富的经验。负责TDMQ的设计与开发工作,目前致力于打造稳定、高效和可扩展的基础组件与服务。 客户端通过消息确认机制通知Bro
腾讯云中间件团队
2021/12/21
2K0
百万级 Topic,Apache Pulsar 在腾讯云的稳定性优化实践
作者简介 冉小龙 腾讯云高级研发工程师 Apache Pulsar committer RoP maintainer Apache Pulsar Go Client、Pulsarctl 与 Go Functions 作者与主要维护者 Apache Pulsar 作为云原生时代消息流系统,采用存储计算分离架构,支持大集群、多租户、百万级 Topic、跨地域数据复制、持久化存储、分层存储、高可扩展性等企业级和金融级功能。Apache Pulsar 提供了统一的消费模型,支持消息队列和流两种场景,既能为队列场景提
腾讯云中间件团队
2022/09/27
1.1K0
百万级 Topic,Apache Pulsar 在腾讯云的稳定性优化实践
Apache Pulsar 技术系列 - Pulsar 总览
导语 Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,数据持久化依赖 Apache BookKeeper 实现,支持多租户、低延时、读写分离、跨地域复制、快速扩容、灵活容错等特性。本文将从以下几个方面为大家介绍 Apache Pulsar的设计原理和特性。 1、Apache Pulsar 架构 2、架构设计的优势 3、Pulsar 特性 4、总结 Apache Pulsar 架构 存储计算分离 Apache Pulsar 是 Pub/Sub 模型的消息系统,并且从设计上做了存储和
腾讯云中间件团队
2023/03/24
1.5K1
Apache Pulsar 技术系列 - Pulsar 总览
Apache Pulsar 系列 —— 深入理解 Bookie GC 回收机制
作者简介 冉小龙 腾讯云高级研发工程师 Apache Pulsar committer RoP maintainer Apache Pulsar Go client, Go Functions 作者及主要维护者 背景 Apache Bookkeeper 是基于日志的一个持久化系统,所有的数据会以日志的形式存储到 Ledger 磁盘的 Entry Log 文件中,之后通过后台异步回收的形式来将 EntryLog 文件回收掉。但是在我们实际的使用场景中,发现很久之前的 EntryLog 文件无法被删除掉,对
腾讯云中间件团队
2022/09/08
1.1K0
Apache Pulsar 系列 —— 深入理解 Bookie GC 回收机制
Apache Pulsar 延迟消息投递解析
导语 | Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制、快速扩容、灵活容错等特性。腾讯数据平台部 MQ 团队对 Pulsar 做了深入调研以及大量的性能和稳定性方面优化,目前已经在腾讯云消息队列 TDMQ 落地上线。本文主要介绍Pulsar延迟消息投递的实现,希望与大家一同交流。 一、什么是延迟消息投递 延迟消息投递在MQ应用场景中十分普遍,它是指消息在发送到 MQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给
腾讯云开发者
2021/02/04
3.4K0
pulsar总览
pulsar 是 Apache 的顶级项目, 定位为下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。Pulsar 是一个 pub-sub (发布-订阅)模型的消息队列系统。
leobhao
2023/03/08
1.2K0
pulsar总览
下一代消息队列pulsar到底是什么?
之前琢磨了很久一直想写一篇pulsar相关的文章,但是一直知识储备不够,对于很多细节还是不了解,于是查了很多资料,总算是可以凑出一篇文章了。
用户5397975
2021/02/03
9.3K0
下一代消息队列pulsar到底是什么?
颠覆Kafka的统治,新一代云原生消息系统Pulsar震撼来袭!
导语 | 在信息流场景,内容的请求处理、原子模块调度、结果的分发等至关重要,将会直接影响到内容的外显、推荐、排序等。基于消息100%成功的要求,我对Pulsar进行了调研,并采用Pulsar实现消息的可靠处理。本文主要参考Pulsar的官方文档和技术文章,对Pulsar的特性、机制、原理等进行整理总结。 一、Pulsar概述 Apache Pulsar是Apache软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多
腾讯云开发者
2022/01/12
8060
5000字阐述云原生消息中间件Apache Pulsar的核心特性和设计概览
Apache Pulsar 是 Apache 软件基金会顶级项目,自称是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
王知无-import_bigdata
2021/07/30
1K0
详解 Apache Pulsar 消息生命周期
在用户视角下,MQ 可以理解为 Pub-Sub 模型,在 Broker 抽象一个 Topic,消息经由生产者发送到 Topic 中然后进入消费者进行消费。
从大数据到人工智能
2023/03/10
1K0
详解 Apache Pulsar 消息生命周期
深度解析:Pulsar的消息存储机制和Bookie的GC机制原理
Apache Pulsar是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制、快速扩容、灵活容错等特性。腾讯数据平台部MQ团队对Pulsar做了深入调研以及大量的性能和稳定性方面优化,目前已经在TDbank落地上线。本文是Pulsar技术系列中的一篇,主要简单梳理了Pulsar消息存储与BookKeeper存储文件的清理机制。其中,BookKeeper可以理解为一个NoSQL的存储系统,默认使用RockDB存储索引数据。
腾讯云中间件团队
2021/04/22
3.5K0
个推基于 Apache Pulsar 的优先级队列方案
当 APP 有推送需求的时候, 会向个推发送一条推送命令,接到推送需求后,我们会把APP要求推送消息的用户放入下发队列中,进行消息下发;当同时有多个APP进行消息下发时,难免会出现资源竞争的情况, 因此就产生了优先级队列的需求,在下发资源固定的情况下, 高优先级的用户需要有更多的下发资源。
个推
2019/04/15
2.9K0
个推基于 Apache Pulsar 的优先级队列方案
千亿级、大规模:腾讯超大 Apache Pulsar 集群性能调优实践
导语 近期,腾讯 TEG 数据平部 MQ 团队开发部署了一套底层运维指标性能分析系统(本文简称 Data 项目) ,目前作为通用基础设施服务整个腾讯集团。该系统旨在收集性能指标、上报数据以用于业务的运维监控,后续也将延用至前后端实时分析场景。 腾讯 Data 项目选用 Apache Pulsar 作为消息系统,其服务端采用 CVM 服务器(Cloud Virtual Machine,CVM)部署,并将生产者和消费者部署在 Kubernetes 上,该项目 Pulsar 集群是腾讯数据平台部 MQ 团队接入的
腾讯云中间件团队
2022/07/27
1K0
千亿级、大规模:腾讯超大 Apache Pulsar 集群性能调优实践
Pulsar 介绍与部署
Apache Pulsar 是灵活的发布-订阅消息系统(Flexible Pub/Sub messaging),采用计算与存储分离的架构。雅虎在 2013 年开始开发 Pulsar ,于 2016 年首次开源,目前是 Apache 软件基金会的顶级项目。Pulsar 具有支持多租户、持久化存储、多机房跨区域数据复制、高吞吐、低延迟等特性。
Se7en258
2021/07/01
3.2K0
案例推荐|千亿级、大规模:腾讯超大 Apache Pulsar 集群性能调优实践
近期,腾讯 TEG 数据平部 MQ 团队开发部署了一套底层运维指标性能分析系统(本文简称 Data 项目) ,目前作为通用基础设施服务整个腾讯集团。该系统旨在收集性能指标、上报数据以用于业务的运维监控,后续也将延用至前后端实时分析场景。
从大数据到人工智能
2022/09/08
7100
案例推荐|千亿级、大规模:腾讯超大 Apache Pulsar 集群性能调优实践
【12图】你管这破玩意叫Pulsar
这两年pulsar发展比较快,有好多大公司引入了pulsar,相关的资料和课程也多了,今天一起来了解一下pulsar这款中间件。
jinjunzhu
2022/09/23
7780
【12图】你管这破玩意叫Pulsar
构建下一代万亿级云原生消息架构:Apache Pulsar 在 vivo 的探索与实践
作者 | 陈建波、全利民 本文整理自 vivo 互联网大数据工程师陈建波与全利民在 Apache Pulsar Meetup 上的演讲《Apache Pulsar 在 vivo 的探索与实践》,介绍 vivo 在集群管理与监控上应用 Pulsar 的实践。 vivo 移动互联网为全球 4 亿 + 智能手机用户提供互联网产品与服务。其中,vivo 分布式消息中间件团队主要为 vivo 所有内外销实时计算业务提供高吞吐、低延时的数据接入、消息队列等服务,覆盖应用商店、短视频、广告等业务。业务集群已达每天十
深度学习与Python
2023/03/29
7560
构建下一代万亿级云原生消息架构:Apache Pulsar 在 vivo 的探索与实践
Apache BookKeeper 一致性协议解析
Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO replication)、快速扩容、灵活容错等特性。Pulsar 存储层依托于 BookKeeper 组件,所以本文简单探讨一下 BookKeeper(下文简称 BK) 的一致性协议是如何实现的。
腾讯云中间件团队
2025/02/10
1100
Apache BookKeeper 一致性协议解析
Redis、Kafka 和 Pulsar 消息队列对比
导语 | 市面上有非常多的消息中间件,rabbitMQ、kafka、rocketMQ、pulsar、 redis等等,多得令人眼花缭乱。它们到底有什么异同,你应该选哪个?本文尝试通过技术演进的方式,以redis、kafka和 pulsar为例,逐步深入,讲讲它们架构和原理,帮助你更好地理解和学习消息队列。文章作者:刘德恩,腾讯IEG研发工程师。
架构之家
2022/07/12
9650
Redis、Kafka 和 Pulsar 消息队列对比
推荐阅读
相关推荐
云原生消息流系统 Apache Pulsar 在腾讯云的大规模生产实践
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验