IndividualDeletedMessages 是 Pulsar 中 shared 和 key_shared 订阅模式中订阅进度的一部分,一般也称为消息空洞。
我们都知道在 Pulsar 中,消费进度由两部分组成:
MarkDeletePosition:消费进度初始位置,此位置以及之前的消息,被认为已经全部消费过。等同于 Kafka 中的 offset。
IndividualDeletedMessages:消息确认集合,也可以称为消息空洞。在 MarkDeletePosition 之后,确认过的消息位置的集合。
订阅进度概念
以消息消费进度图为例:
MarkDeletePosition = 1:2
IndividualDeletedMessages = [ (1:4 - 1:6], (1:7 - 1:8] ]
这里出现消息空洞,即确认消息不连续的的情况有很多,常见的如下:
1. shared 或者 key_shared 消费模式下,某些消息消费的快,某些消息消费慢,很容易出现空洞的情况。
2. 主题中存在延迟消息的情况,上面 1:3、1:4、1:7 等位置的消息可能还没有到达延迟时间,但是 1:5、1:6、1:8 等消息已经达到延迟时间,这个时候也会出现空洞的情况。
消息空洞和 unack 的区别
unack 表示服务端已经推送消息到消费者的内存队列中,但是消费者并没有返回服务端消息确认(即没有调用 consumer.acknowledge 相关接口)。(常见的情况包括:消费慢、消费逻辑阻塞导致的 unack 等。)
例如消息消费进度图中:消息 1:4 和 1:10,是服务端推送给消费者,但是消费没有 ACK 的消息,这些消息会被计入到 unack 消息中。
说明:
综上所述,消息空洞和 unack 是两个不同的概念,两者没有直接关系。不过,unack 某些情况下会产生消息空洞(例如消息消费进度图的场景中)。
数量限制背景
订阅进度中的 IndividualDeletedMessages 部分需要定期持久化,持久化的过程是将 IndividualDeletedMessages 对象序列化成字符串,作为一条内部消息,存储到 Bookie 集群中,以便在主题重新加载后可以获取到完整的订阅进度信息。
在 Pulsar 中,存储的 IndividualDeletedMessages 的数量存在限制,是通过集群维度的配置项 managedLedgerMaxUnackedRangesToPersist 指定的,默认10000。当 IndividualDeletedMessages 的数量超过 1 万,只会持久化 IndividualDeletedMessages 中最前面的 1 万个,IndividualDeletedMessages 超过 1 万的部分会丢失。
限制的主要原因主要归结于以下两个方面:
1. 如果不限制,当 IndividualDeletedMessages 过大的时候,内部消息长度会变大,消息队列并不适合处理过大的消息。这样,内部订阅进度产生的消息会给集群带来过大的压力,甚至影响整个集群的稳定性。
2. IndividualDeletedMessages 也是作为消息存储到 bookie 中,也需要遵循消息最大长度限制(5M)。根据之前测试经验,当 IndividualDeletedMessages 的数量超过20w,生成的消息长度就会达到5M。
消息空洞的产生和实践教程
IndividualDeletedMessages 的产生有以下4种场景。其中延迟消息和 key_shared 订阅可能会产生大量的 IndividualDeletedMessages,这两个场景下,更容易出现进度丢失的情况。
场景1:未确认消息
订阅模式:shared、key_shared
订阅进度:
MarkDeletePosition = 1:2
IndividualDeletedMessages = [ (1:4 - 1:6], (1:7 - 1:8] ]
IndividualDeletedMessages 的产生和消费行为相关,不同消息消费耗时不同。
服务端推送消息给客户端,每个消息的消费耗时不一致,可能出现较晚生产的消息先消费完成的情况。这个时候,会导致 IndividualDeletedMessages 的数量的增加。
IndividualDeletedMessages 最多不会超过最大未确认消息数量(默认5000)。
结论:IndividualDeletedMessages 不能超过最大未确认消息数量(默认5000),通常无 IndividualDeletedMessages 超过managedLedgerMaxUnackedRangesToPersist 的风险。
场景2:延迟消息
订阅模式:shared、key_shared
订阅进度:
MarkDeletePosition = 1:2
IndividualDeletedMessages = [ (1:4 - 1:6], (1:7 - 1:8] ]
IndividualDeletedMessages 数量和延迟消息在队列中的分布情况有关系。
当读取消息的时候,发现消息是延迟消息,并且没有到达延迟时间,对应位置的消息无法推送到消费者。当后面的消息延迟时间先于前面发送的消息延迟时间达到,后面生产的消息会先消费完成。这个时候,会导致 IndividualDeletedMessages 的数量的增加。
如果产生的空洞超过 1万个,当主题重新加载的时候,就会存在订阅进度部分丢失的情况。
结论:IndividualDeletedMessages 可能超过 managedLedgerMaxUnackedRangesToPersist。
实践教程
如何降低(控制)消息空洞的数量,(根据上面的介绍)在了解空洞(IndividualDeletedMessages)产生的原理后,下面的建议通常可以减少空洞数量,或者避免消息空洞引起的重复消息:
1. 消费端做好幂等,以此作为兜底方案。
2. 将「延迟消息」和「非延迟消息」分别放到不同的主题。
3. 尽量让延迟消息的延迟时间保持一定的顺序性(最好是递增)【需要结合实际的业务场景】。
4. 尽量让相同的延迟时间的消息放到一起发送【需要结合实际业务场景】。
5. 扩容分区,同时做好分区间数据均衡。
场景3:key_shard 订阅
订阅模式:key_shared
订阅进度:
MarkDeletePosition = 1:2
IndividualDeletedMessages = [ (1:4 - 1:6], (1:7 - 1:8] ]
IndividualDeletedMessages 数量和消息的推送情况相关。
当某些 key 的数量过多或者消费过慢,consumer 无法接收更多对应 key 的消息时,服务端读取到对应 key 的消息后,无法推送给客户端。如果后面发送的消息可以投递给消费者,并且先于前面的消息消费完成。这个时候,会导致 IndividualDeletedMessages 的数量的增加。
如果产生的空洞超过 1w 个,当主题重新加载的时候,就会存在订阅进度部分丢失的情况。
结论:IndividualDeletedMessages 可能超过 managedLedgerMaxUnackedRangesToPersist。
实践教程
1. 消费端做好幂等。
2. 做好消费能力的快速提升(横向扩容能力),尽量避免出现消息堆积的情况【这一点和官网对于 key_shared 的订阅模式的实践教程一致】。
3. Key 数量多且每个 Key 的消息分布相对均匀,避免由于部分 key 数据倾斜或者处理慢造成堆积。
场景4:Exclusive\\Failover 订阅
订阅模式:Exclusive、Failover
订阅进度:
MarkDeletePosition = 1:2
IndividualDeletedMessages = [ (1:4 - 1:6], (1:7 - 1:8] ]
和场景1【未确认消息】的区别是,Exclusive 和 Failover 订阅模式下,服务端不统计 unack 消息,consumer 只要 receive 消息,服务端就可以继续推送后续的消息。所以,如果有消息没有确认,会导致 IndividualDeletedMessages 增加。
结论:IndividualDeletedMessages 可能超过 managedLedgerMaxUnackedRangesToPersist。
实践教程
1. 推荐尽量使用累积确认(acknowledgeCumulative)的方式,可以避免产生空洞(IndividualDeletedMessages)。
2. 如果使用的是单条确认(acknowledge)的方式。
消费端做好幂等。
做好分区间消息数量的均衡。
做好消费能力的快速提升(横向扩容能力),尽量避免出现消息堆积的情况。
3. 如果是 Failover 订阅模式,通过扩容分区,可能缓解。