我们前面的Pulsar存储计算分离架构设计系列已经介绍过Broker无状态、存储层BookKeeper的文章了,这篇我们主要来说下元数据管理。
在分布式系统的设计哲学中,元数据管理如同人体的神经系统,其架构选择直接决定了系统的扩展性、可靠性与性能边界。Apache Pulsar作为云原生时代的消息中间件标杆,其元数据管理架构的演进历程恰是分布式技术发展史的微观缩影——从早期单一依赖ZooKeeper的轻量级协调,到如今支持etcd、RocksDB等多存储引擎的混合架构,这一跃迁背后既有技术迭代的必然性,也有应对万亿级消息场景的实践智慧。
回溯至2.9.0版本之前,Pulsar的元数据管理仅提供两种实现路径:基于ZooKeeper的分布式协调服务,以及仅适用于单机测试场景的本地内存存储。这一阶段的设计充分体现了其设计初衷:通过ZooKeeper的强一致性保障集群拓扑、主题分区等关键元数据的可靠同步,同时利用本地内存实现快速原型验证。然而,随着Pulsar在企业级场景中承载万亿级消息处理,单一架构的局限性逐渐显现——ZooKeeper的写性能瓶颈、内存存储的易失性缺陷,均成为规模化部署的隐性成本。老周这里多嘴一下,我分析过很多中间件,发展到了一定阶段都会移除ZooKeeper,大家组件选型的时候要注意一下。
技术演进的转折点出现在2.10.0版本,etcd与RocksDB的引入标志着Pulsar元数据管理进入多引擎并存的新纪元。这一变革绝非简单的功能叠加,而是对分布式系统核心命题的重新思考:当ZooKeeper的CP特性无法满足高吞吐需求时,etcd的Raft算法提供了更优的写性能与选举效率;当元数据规模突破内存限制时,RocksDB的LSM树结构则实现了磁盘级的高效键值存储。这种架构的多元化,既是对云原生时代基础设施多样化的适配,更是Pulsar从"能用"走向"好用"的关键一跃。

Apache Pulsar的metadata模块采用了清晰的分层架构设计,为核心元数据存储和分布式协调服务提供了统一的抽象接口。该模块以MetadataStore接口作为核心入口点,定义了基本的键值存储操作,包括数据读取(get)、写入(put)、删除(delete)以及子节点查询(getChildren)等基础功能。通过MetadataStoreExtended接口扩展了更多高级特性,如支持创建选项和会话监听机制。
在实现层面,模块采用了抽象基类AbstractMetadataStore提供通用实现,具体实现了基于ZooKeeper的ZKMetadataStore、内存存储的LocalMemoryMetadataStore以及用于测试的FaultInjectionMetadataStore等多种存储后端。这种设计使得上层应用可以透明地切换不同的存储实现,同时保证了系统的灵活性和可测试性。
为了提升性能,模块集成了基于缓存的MetadataCache机制,通过MetadataCacheImpl实现对热点数据的缓存管理,有效减少了对底层存储系统的直接访问。此外,模块还提供了完整的分布式协调服务功能,通过CoordinationService接口及其实现类CoordinationServiceImpl,为上层应用提供了领导者选举(LeaderElection)、分布式锁(LockManager)和计数器等重要的分布式协调原语,这些功能对于构建高可用的分布式系统至关重要。
3.1 读取数据流程

3.2 写入数据流程

3.3 分布式锁获取流程

可以看下这个测试类,org.apache.pulsar.metadata.MetadataStoreExtendedTest#sequentialKeys

4.1 创建MetadataStoreExtended

4.2 put操作
调用 storePut 抽象方法,交给不同的存储层去实现。

我们这里挑ZKMetadataStore来说一下:

ZKMetadataStore类中的storePut方法是Pulsar元数据存储模块的核心写入实现,负责处理ZooKeeper中节点的创建和更新操作。该方法通过版本控制机制和多种创建选项,提供了灵活且安全的数据写入功能。
4.3 EtcdMetadataStore
老周分析的代码版本是2.9.1,只有ZKMetadataStore和LocalMemoryMetadataStore,这里老周扩展一下特地拉了最新分支的代码,现在还支持EtcdMetadataStore、OxiaMetadataStore、RocksdbMetadataStore。
这里老周挑一个EtcdMetadataStore的源码来讲。
和之前的想比,这里扩展了一个批量读取的抽象类出来AbstractBatchedMetadataStore,继承了之前的AbstractMetadataStore抽象类,然后提供队列支持批量读写的能力。
EtcdMetadataStore的读写操作时序图:

etcd 批量操作的核心流程如下:客户端首先调用 AbstractBatchedMetadataStore 中的 storePut、storeGet 等方法发起元数据操作请求,AbstractBatchedMetadataStore 会将这些操作封装成对应的 OpPut、OpGet 等对象,然后通过 enqueue 方法将操作添加到相应的操作队列(readOps 或 writeOps)中。当队列中的操作数量达到阈值或者定时刷新任务(FlushTask)触发时,系统会调用 internalBatchOperation 方法来处理批量操作,该方法最终会调用 EtcdMetadataStore 实现的 batchOperation 抽象方法,在这个方法中构建针对 etcd 的事务请求并执行,最后将结果通过 CompletableFuture 返回给客户端,整个过程通过队列缓冲和批量处理机制有效提升了与 etcd 交互的性能和效率。
源码可以看这里:
org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore#enqueue
private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) {
// 检查存储是否已关闭,如果已关闭则直接完成操作并抛出异常
if (isClosed()) {
MetadataStoreException ex = new MetadataStoreException.AlreadyClosedException();
op.getFuture().completeExceptionally(ex);
return;
}
// 如果启用了批处理功能
if (enabled) {
// 尝试将操作添加到队列中,如果添加失败则立即单独执行该操作
if (!queue.offer(op)) {
// Execute individually if we're failing to enqueue
internalBatchOperation(Collections.singletonList(op));
return;
}
// 当队列大小超过最大操作数且当前没有正在进行的刷新操作时,触发一次刷新操作
if (queue.size() > maxOperations && flushInProgress.compareAndSet(false, true)) {
executor.execute(this::flush);
}
} else {
// 如果未启用批处理功能,则直接执行单独操作
internalBatchOperation(Collections.singletonList(op));
}
}
主要看queue.offer那里,往队列里放值,成功入队的话返回true,队列满了的话则返回false。立即单独执行该操作。

这个设计有点意思,第一避免因队列满而导致的阻塞或拒绝服务,第二单个操作的执行比等待队列空间更快速。这种设计在保证批处理性能的同时,也确保了系统的可靠性和操作的不丢失。
然后另一个核心就是flush方法:

好了,元数据管理就说到这里,我们下一篇来说一下负载均衡与分片管理。