首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Pulsar存储计算分离架构设计之消息副本同步和故障转移机制

Pulsar存储计算分离架构设计之消息副本同步和故障转移机制

作者头像
老周聊架构
发布2025-11-19 15:18:18
发布2025-11-19 15:18:18
420
举报

一、前言

我们继续来讲Pulsar存储计算分离架构设计系列,这篇我们来说说消息副本和故障转移机制。

在分布式消息系统的高可用性架构中,消息副本机制与故障转移能力是确保数据可靠性和服务连续性的核心支柱,基本上每款消息中间件都有消息副本机制与故障转移能力,包括但不限于Kafka、RocketMQ、RabbitMQ。

Apache Pulsar通过多副本存储和智能故障转移机制,构建了多层次容错体系:其基于BookKeeper的持久化存储层采用多副本(通常3副本)机制,将消息数据分散存储在独立的Bookie节点上,通过Quorum协议保证数据强一致性;计算层Broker则通过ZooKeeper协调实现故障自动检测与流量切换,当主节点宕机时,备用节点能在秒级内接管服务,确保生产者/消费者的连接不中断。这种存储与计算解耦的架构设计,使Pulsar既能避免单点故障导致的数据丢失,又能实现业务无感知的故障恢复,特别适合金融交易、物联网等对可靠性要求严苛的场景。

二、核心类图

Pulsar的持久化复制机制核心流程如下:PersistentTopic作为消息处理的中心枢纽,通过ManagedLedger接口管理消息的持久化存储,当需要跨集群复制时,PersistentReplicator组件会创建与远程集群的连接并使用ProducerImpl发送消息。

复制流程从ManagedCursor读取Entry消息开始,经过MessageImpl反序列化处理,通过ProducerImpl的sendAsync方法异步发送到目标集群,发送完成后通过回调机制更新游标的消费位置并触发下一批消息的读取,形成了一个完整的复制闭环。整个过程中DispatchRateLimiter负责控制复制速率防止网络拥塞,而PersistentMessageExpiryMonitor则负责处理消息过期逻辑,确保系统资源的有效利用。

三、核心操作时序图

Pulsar消息副本和故障转移机制的核心流程如下:

  • 消息发布与本地存储:客户端向本地 PersistentTopic 发布消息,消息首先被持久化存储到本地 ManagedLedger 中,存储完成后向客户端返回确认。
  • 消息复制流程:PersistentReplicator 通过 ManagedCursor 从本地 ManagedLedger 中读取待复制的消息条目,然后通过 ProducerImpl 将消息异步发送到远程集群,远程集群将其存储到自己的 ManagedLedger 中并返回确认。
  • 复制状态更新:消息成功复制到远程集群后,PersistentReplicator 会更新本地 ManagedCursor 的位置,标记已复制的消息,避免重复复制。
  • 故障处理与恢复:当与远程集群的连接失败时,PersistentReplicator 会检测到连接异常并进行故障处理,包括重试机制,在连接恢复后重新建立连接并从上次中断的位置继续复制流程。

整个机制通过异步非阻塞的方式实现高效的消息复制,同时具备故障检测和自动恢复能力,确保消息在跨集群环境中的可靠传输。

四、源码分析

4.1 消息副本同步机制

当Entry成功添加到ManagedLedger后,会通过AddEntryCallback的addComplete方法回调通知调用方。

在这里插入图片描述
在这里插入图片描述

这是通过CountDownLatch机制同步等待异步操作完成的结果,这里的逻辑对应上面时序图中的前两步。

我们这里主要来看下PersistentReplicator读取消息的机制,对应的时序图中的Replicator。

PersistentTopic中的addProducer重写了父类AbstractTopic的addProducer,其它的子类没有重新,这是因为PersistentTopic有以下两个功能要保障。

在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
public class Policies {

    // 需要复制的集群信息
    public Set<String> replication_clusters = new HashSet<>();

  // ...

}
在这里插入图片描述
在这里插入图片描述

核心方法 readMoreEntries:

在这里插入图片描述
在这里插入图片描述

readMoreEntries 方法是 PersistentReplicator 中控制消息复制流程的核心实现,主要负责根据系统状态和资源配置动态调整消息读取策略。该方法首先通过 getAvailablePermits() 计算当前可读取的消息数量,考虑了生产者队列容量和限流设置等因素。如果允许读取(availablePermits > 0),则根据可用许可和 readBatchSize 确定实际读取消息数,并通过原子操作 HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE) 确保同一时间只有一个读取操作在进行,避免重复读取。当生产者不可写时,会将读取批量减小到1条消息以实现流量控制。如果触发限流(返回-1),则延迟 MESSAGE_RATE_BACKOFF_MS 毫秒后重新尝试读取。最终通过 cursor.asyncReadEntriesOrWait() 异步从本地 ManagedLedger 读取消息,读取完成后回调 readEntriesComplete 方法进行处理,形成完整的复制循环。

接着看这个readEntriesComplete方法,上面readMoreEntries完成后,会回掉到readEntriesComplete方法里来。

org.apache.pulsar.broker.service.persistent.PersistentReplicator#readEntriesComplete

代码语言:javascript
复制
public void readEntriesComplete(List<Entry> entries, Object ctx) {
    if (log.isDebugEnabled()) {
        log.debug("[{}][{} -> {}] Read entries complete of {} messages", topicName, localCluster, remoteCluster,
                entries.size());
    }

    int maxReadBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize();
    if (readBatchSize < maxReadBatchSize) {
        int newReadBatchSize = Math.min(readBatchSize * 2, maxReadBatchSize);
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", topicName, localCluster,
                    remoteCluster, readBatchSize, newReadBatchSize);
        }

        readBatchSize = newReadBatchSize;
    }

    readFailureBackoff.reduceToHalf();

    boolean atLeastOneMessageSentForReplication = false;
    boolean isEnableReplicatedSubscriptions =
            brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();

    try {
        // This flag is set to true when we skip atleast one local message,
        // in order to skip remaining local messages.
        boolean isLocalMessageSkippedOnce = false;
        for (int i = 0; i < entries.size(); i++) {
            Entry entry = entries.get(i);
            int length = entry.getLength();
            ByteBuf headersAndPayload = entry.getDataBuffer();
            MessageImpl msg;
            try {
                msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
            } catch (Throwable t) {
                log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
                        localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
                cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                entry.release();
                continue;
            }

            if (isEnableReplicatedSubscriptions) {
                checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
            }

            if (msg.isReplicated()) {
                // Discard messages that were already replicated into this region
                cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                entry.release();
                msg.recycle();
                continue;
            }

            if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Skipping message at position {}, replicateTo {}", topicName,
                            localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo());
                }
                cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                entry.release();
                msg.recycle();
                continue;
            }

            if (msg.isExpired(messageTTLInSeconds)) {
                msgExpired.recordEvent(0/* no value stat */);
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Discarding expired message at position {}, replicateTo {}", topicName,
                            localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo());
                }
                cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                entry.release();
                msg.recycle();
                continue;
            }

            if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
                // The producer is not ready yet after having stopped/restarted. Drop the message because it will
                // recovered when the producer is ready
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Dropping read message at {} because producer is not ready", topicName,
                            localCluster, remoteCluster, entry.getPosition());
                }
                isLocalMessageSkippedOnce = true;
                entry.release();
                msg.recycle();
                continue;
            }

            dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));

            // Increment pending messages for messages produced locally
            PENDING_MESSAGES_UPDATER.incrementAndGet(this);

            msgOut.recordEvent(headersAndPayload.readableBytes());

            msg.setReplicatedFrom(localCluster);

            headersAndPayload.retain();

            getSchemaInfo(msg).thenAccept(schemaInfo -> {
                msg.setSchemaInfoForReplicator(schemaInfo);
                producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
            }).exceptionally(ex -> {
                log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
                        localCluster, remoteCluster, ex);
                returnnull;
            });

            atLeastOneMessageSentForReplication = true;
        }
    } catch (Exception e) {
        log.error("[{}][{} -> {}] Unexpected exception: {}", topicName, localCluster, remoteCluster, e.getMessage(),
                e);
    }

    HAVE_PENDING_READ_UPDATER.set(this, FALSE);

    if (atLeastOneMessageSentForReplication && !isWritable()) {
        // Don't read any more entries until the current pending entries are persisted
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Pausing replication traffic. at-least-one: {} is-writable: {}", topicName,
                    localCluster, remoteCluster, atLeastOneMessageSentForReplication, isWritable());
        }
    } else {
        readMoreEntries();
    }
}

readEntriesComplete 方法是 PersistentReplicator 中处理从本地集群读取消息完成后的核心回调方法。当从本地 ManagedLedger 读取到消息后,该方法会遍历所有读取到的 Entry 条目,对每条消息进行反序列化和一系列过滤检查,包括检查是否为已复制的消息、是否应该复制到目标集群、是否过期等。对于需要复制的消息,会通过 producer.sendAsync 方法异步发送到远程集群,发送时会附带 ProducerSendCallback 回调来处理发送结果。成功发送到远程集群的消息会被标记删除,避免重复复制,同时根据发送状态和系统负载情况决定是否继续读取更多消息进行复制,从而形成完整的消息复制循环流程。

这样就完成了消息副本的同步机制了!!!

4.2 故障转移机制

当与远程集群的连接失败时,PersistentReplicator 会检测到连接异常并进行故障处理,包括重试机制,在连接恢复后重新建立连接并从上次中断的位置继续复制流程。

4.2.1 ProducerSendCallback.sendComplete 方法中处理发送异常的情况

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

当发送消息到远程集群失败时,会记录错误日志并将游标回退,以便在连接恢复后重新发送消息。

4.2.2 readEntriesFailed 方法处理读取条目失败的情况

4.2.3 readMoreEntries 方法中的重试机制

之前上面分析过,我这里就不重复了。

代码语言:javascript
复制
 } else if (availablePermits == -1) {
     // no permits from rate limit
     topic.getBrokerService().executor().schedule(
         () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
 }

当遇到限流等情况时,会安排延迟重试。

4.2.4 ProducerImpl 内部的连接管理和重试机制

ProducerImpl 内部的连接管理和重试机制主要通过 ClientCnx 对象管理与 broker 的连接,使用 getState() 方法检查生产者状态(如 Ready、Connecting、Closing、Closed 等),并在连接断开时触发 connectionClosed() 回调进行重连。在 sendAsync() 方法中,当发送失败时会检查是否需要重试,并使用 pendingMessages 队列暂存待发送消息,通过 grabCnx() 方法获取或重新建立连接。连接恢复处理通过 reconnectLater() 方法实现延迟重连逻辑,使用 backoff 策略控制重连间隔,并通过 connectionOpened() 回调处理连接成功建立后的操作。消息确认和重传机制通过 ackReceived() 处理 broker 的确认响应,对于未确认的消息,在重连后会重新发送,并使用 pendingMessages 队列跟踪待确认消息,确保了 ProducerImpl 在网络波动或 broker 重启等异常情况下能够自动恢复连接并保证消息的可靠传输。

以上四点的机制共同确保了在与远程集群连接失败时能够检测到异常并进行适当的故障处理和重试。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-11-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 老周聊架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、核心类图
  • 三、核心操作时序图
  • 四、源码分析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档