我们继续来讲Pulsar存储计算分离架构设计系列,这篇我们来说说负载均衡与分片管理。
在分布式消息系统领域,Apache Pulsar凭借其独特的计算存储分离架构和Bundle分片机制,实现了百万级Topic的精细化负载管理。作为云原生时代的消息流平台,Pulsar通过将Namespace划分为逻辑上的Bundle单元,既避免了以单个Topic为调度粒度的性能瓶颈,又突破了传统Namespace整体迁移的资源消耗问题。这种设计使得Broker集群能够动态感知节点负载,当监测到CPU、内存或带宽利用率不均衡时,自动触发Bundle迁移操作,将高负载节点上的消息流重新分配到空闲节点,整个过程无需人工干预且不影响业务连续性。这种创新的负载均衡机制,正是Pulsar在应对金融、物联网等海量消息场景时保持高可扩展性的核心技术支撑。

Apache Pulsar的NamespaceBundle管理机制是其分布式架构的核心组件,通过NamespaceBundleFactory管理NamespaceName到NamespaceBundle的映射关系,支持动态分裂和合并操作。每个NamespaceBundle代表一个命名空间的分片,通过NamespaceBundles进行组织管理,而NamespaceService作为核心服务协调NamespaceBundleFactory和OwnershipCache来处理Bundle的获取、分裂和所有权管理。OwnershipCache利用分布式锁机制通过LockManager和ResourceLock确保同一时间只有一个Broker拥有特定Bundle的所有权,通过OwnedBundle管理Bundle的激活状态和卸载请求。负载均衡功能由LoadManager和ModularLoadManager实现,结合BundleSplitStrategy监控BundleData和NamespaceBundleStats中的负载指标,当检测到热点Bundle时触发自动分裂流程。整个机制通过ServiceUnitUtils提供工具方法,使用元数据存储系统维护Bundle的持久化状态,确保集群中Bundle分布的均衡性和一致性,同时NamespaceBundleSplitAlgorithm提供不同的分裂算法支持灵活的Bundle分割策略。
3.1 Bundle获取流程

Bundle获取的时序图展示了客户端请求获取特定Topic对应的NamespaceBundle的完整流程。
当客户端发起请求时,首先由NamespaceService接收请求并调用getBundleAsync方法处理。NamespaceService会委托给NamespaceBundleFactory来实际执行Bundle查找操作,NamespaceBundleFactory通过其内部的bundlesCache异步加载对应Namespace的NamespaceBundles数据。如果缓存中没有找到,会从元数据存储中获取Namespace的分片信息并构建NamespaceBundles对象。获取到NamespaceBundles后,根据Topic名称的hash值在分片数组中查找对应的NamespaceBundle。最后将找到的NamespaceBundle逐层返回给客户端,完成整个Bundle获取流程。这个过程充分利用了缓存机制来提高性能,并通过异步操作避免阻塞。
3.2 Bundle分裂流程

Bundle分裂时序图展示了Pulsar中热点NamespaceBundle自动分裂的完整流程。
该流程由负载管理器定期检测触发,首先通过LoadManager检测到某个NamespaceBundle负载过高,如果启用了自动分裂功能,则根据BundleData和NamespaceBundleStats中的统计信息判断是否需要分裂。当确定需要分裂时,NamespaceService调用splitAndOwnBundle方法开始处理,通过NamespaceBundleSplitAlgorithm计算分裂边界点。接着NamespaceBundleFactory执行实际的分裂操作,将一个Bundle分割成两个新的Bundle,并更新元数据存储中的NamespaceBundles信息。分裂完成后,OwnershipCache获取新分裂Bundle的所有权,通过分布式锁机制在元数据存储中创建新的所有权节点。最后,原Bundle被标记为禁用状态,整个分裂流程完成,实现了热点Bundle的自动负载分散。
3.3 Bundle卸载流程

Bundle卸载流程是Pulsar中用于将NamespaceBundle从当前Broker移除并重新分配给其他Broker的重要机制。当管理员通过AdminClient发起卸载请求时,流程开始执行。
首先,AdminClient调用NamespaceService的卸载方法,NamespaceService会从OwnershipCache中获取对应的OwnedBundle实例。然后,NamespaceService调用OwnedBundle的handleUnloadRequest方法开始实际的卸载过程。在OwnedBundle内部,首先会将自身的活跃状态(isActive)设置为false,这样新的请求将不会被接受。
接着,OwnedBundle委托BrokerService执行unloadServiceUnit操作,这一步会关闭该Bundle内的所有Topics,确保没有活跃的生产者或消费者连接。当所有Topics都成功关闭后,BrokerService会通知OwnedBundle卸载完成。随后,OwnedBundle会通知OwnershipCache移除该Bundle的所有权信息。OwnershipCache会与元数据存储系统交互,删除在ZooKeeper中存储的Bundle所有权节点,完成所有权的释放。
最后,当所有权成功移除后,整个卸载流程完成,NamespaceService会向AdminClient返回卸载成功的响应。这个流程确保了Bundle的平滑卸载,避免了数据丢失和服务中断。
4.1 Bundle获取流程

这里主要看一下 Arrays.binarySearch(partitions, hash)这个方法,这种方法确保了无论hash值落在哪个区间,都能正确找到对应的bundle,是Pulsar高效topic-to-bundle映射的关键实现。
举个例子:
在 Arrays.binarySearch(partitions, hash) 中,当未找到精确匹配时返回负值,这是因为:
假设 partitions = [0x00000000, 0x40000000, 0x80000000, 0xffffffff],对应3个bundles:
当 hash = 0x50000000 时:
你有可能会问,老周啊,为啥未找到时返回 -(插入点) - 1 ?
这是因为二分查找算法的底层源码就是这样实现的,算法源码如下:

4.2 Bundle分裂流程
首先看到 org.apache.pulsar.broker.PulsarService#start

接着启动负载管理服务,这里会起一个LoadReportUpdaterTask,默认5秒定期更新负载报告。

该方法在负载管理器启动时执行,主要完成以下初始化工作:



如果需要,执行namespace bundle分裂操作,千呼万唤始出来!!!
/**
* 检测并分裂热点namespace bundles
*
* 该方法根据配置的阈值检测热点bundle,并在必要时进行分裂以分散负载。
*
* 分裂条件:
* 1. bundle中的topic数量超过阈值
* 2. bundle中的会话数(生产者+消费者)超过阈值
* 3. bundle的消息速率超过阈值
* 4. bundle的带宽使用超过阈值
* 5. namespace的bundle总数未达到最大限制
* 6. bundle中包含多个topic
*
* 分裂过程:
* 1. 遍历所有bundle统计信息
* 2. 检查每个bundle是否满足分裂条件
* 3. 如果满足条件且启用了自动分裂,则调用admin client执行分裂操作
* 4. 分裂完成后设置强制更新负载报告标志
*/
@Override
public void doNamespaceBundleSplit() throws Exception {
// 获取配置的分裂阈值
int maxBundleCount = pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles();
long maxBundleTopics = pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxTopics();
long maxBundleSessions = pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxSessions();
long maxBundleMsgRate = pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate();
long maxBundleBandwidth =
pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
log.info(
"Running namespace bundle split with thresholds: topics {}, sessions {},"
+ " msgRate {}, bandwidth {}, maxBundles {}",
maxBundleTopics, maxBundleSessions, maxBundleMsgRate, maxBundleBandwidth, maxBundleCount);
// 如果没有负载报告或bundle统计信息,则返回
if (this.lastLoadReport == null || this.lastLoadReport.getBundleStats() == null) {
return;
}
Map<String, NamespaceBundleStats> bundleStats = this.lastLoadReport.getBundleStats();
Set<String> bundlesToBeSplit = new HashSet<>();
// 遍历所有bundle统计信息,检查是否需要分裂
for (Map.Entry<String, NamespaceBundleStats> statsEntry : bundleStats.entrySet()) {
String bundleName = statsEntry.getKey();
NamespaceBundleStats stats = statsEntry.getValue();
long totalSessions = stats.consumerCount + stats.producerCount;
double totalMsgRate = stats.msgRateIn + stats.msgRateOut;
double totalBandwidth = stats.msgThroughputIn + stats.msgThroughputOut;
boolean needSplit = false;
// 检查是否超过各项阈值
if (stats.topics > maxBundleTopics || totalSessions > maxBundleSessions || totalMsgRate > maxBundleMsgRate
|| totalBandwidth > maxBundleBandwidth) {
// 如果只有一个topic,则不进行分裂
if (stats.topics <= 1) {
log.info("Unable to split hot namespace bundle {} since there is only one topic.", bundleName);
} else {
// 检查namespace的bundle数量是否已达上限
NamespaceName namespaceName = NamespaceName
.get(LoadManagerShared.getNamespaceNameFromBundleName(bundleName));
int numBundles = pulsar.getNamespaceService().getBundleCount(namespaceName);
if (numBundles >= maxBundleCount) {
log.info("Unable to split hot namespace bundle {} since the namespace has too many bundles.",
bundleName);
} else {
needSplit = true;
}
}
}
// 如果需要分裂
if (needSplit) {
// 检查是否启用了自动bundle分裂
if (this.getLoadBalancerAutoBundleSplitEnabled()) {
log.info(
"Will split hot namespace bundle {}, topics {}, producers+consumers {},"
+ " msgRate in+out {}, bandwidth in+out {}",
bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth);
bundlesToBeSplit.add(bundleName);
} else {
log.info(
"DRY RUN - split hot namespace bundle {}, topics {}, producers+consumers {},"
+ " msgRate in+out {}, bandwidth in+out {}",
bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth);
}
}
}
// 如果有待分裂的bundle,则执行分裂操作
if (bundlesToBeSplit.size() > 0) {
for (String bundleName : bundlesToBeSplit) {
try {
// 调用admin client执行namespace bundle分裂
pulsar.getAdminClient().namespaces().splitNamespaceBundle(
LoadManagerShared.getNamespaceNameFromBundleName(bundleName),
LoadManagerShared.getBundleRangeFromBundleName(bundleName),
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), null);
log.info("Successfully split namespace bundle {}", bundleName);
} catch (Exception e) {
log.error("Failed to split namespace bundle {}", bundleName, e);
}
}
// 设置强制更新负载报告标志
this.setLoadReportForceUpdateFlag();
}
}
重点在这个方法 pulsar.getAdminClient().namespaces().splitNamespaceBundle()

上面那个REST API 会请求到 org.apache.pulsar.broker.admin.v2.Namespaces#splitNamespaceBundle这里来:

/**
* 内部分裂命名空间Bundle
*
* 该方法处理命名空间bundle的分裂请求,将一个bundle分裂成多个较小的bundle以实现负载均衡。
*
* @param asyncResponse 异步响应对象,用于返回处理结果
* @param bundleName 需要分裂的bundle名称,可以是具体的bundle范围或特殊值(如"LARGEST_BUNDLE")
* @param authoritative 是否强制执行,如果为true则不检查所有权
* @param unload 分裂后是否卸载bundle
* @param splitAlgorithmName 使用的分裂算法名称
*
* 主要处理流程:
* 1. 权限验证:检查调用者是否具有超级用户权限
* 2. 参数验证:检查bundleName是否为空
* 3. Bundle范围解析:如果bundleName是"LARGEST_BUNDLE",则找到包含最多topic的bundle
* 4. 所有权验证:验证当前broker是否拥有该bundle的所有权
* 5. 算法验证:检查指定的分裂算法是否被支持
* 6. 执行分裂:调用NamespaceService.splitAndOwnBundle执行实际的分裂操作
* 7. 结果处理:成功时返回204状态码,失败时返回相应的错误信息
*
* 异常处理:
* - IllegalArgumentException: 参数错误导致分裂失败,返回412状态码
* - 其他异常: 返回具体的错误信息
*/
protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleName,
boolean authoritative, boolean unload, String splitAlgorithmName) {
// 验证调用者是否具有超级用户权限
validateSuperUserAccess();
// 检查bundleName参数是否为空
checkNotNull(bundleName, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);
// 如果bundleName是"LARGEST_BUNDLE",则找到包含最多topic的bundle
String bundleRange = bundleName.equals(Policies.LARGEST_BUNDLE)
? findLargestBundleWithTopics(namespaceName).getBundleRange()
: bundleName;
// 获取命名空间策略
Policies policies = getNamespacePolicies(namespaceName);
// 验证全局命名空间所有权或集群所有权
if (namespaceName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
} else {
validateClusterOwnership(namespaceName.getCluster());
validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
}
// 验证策略是否为只读
validatePoliciesReadOnlyAccess();
// 检查指定的分裂算法是否被支持
List<String> supportedNamespaceBundleSplitAlgorithms =
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName)
&& !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms));
}
NamespaceBundle nsBundle;
try {
// 验证命名空间bundle所有权
nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
} catch (Exception e) {
asyncResponse.resume(e);
return;
}
// 执行bundle分裂操作
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName))
.thenRun(() -> {
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
if (ex.getCause() instanceof IllegalArgumentException) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
bundleRange, ex.getMessage());
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Split bundle failed due to invalid request"));
} else {
log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, ex);
asyncResponse.resume(new RestException(ex.getCause()));
}
returnnull;
});
}
继续跟进splitAndOwnBundle方法,发现前面的都是些开胃菜,真正的分裂逻辑在这个splitAndOwnBundleOnceAndRetry方法里
/**
* 尝试分裂并拥有一个命名空间束(Bundle),如果失败则重试
*
* 该方法负责执行命名空间束的分裂操作,包括获取分裂点、创建新的束、更新元数据以及处理重试逻辑。
*
* @param bundle 需要分裂的命名空间束
* @param unload 分裂完成后是否卸载新生成的束
* @param counter 重试计数器,用于控制重试次数
* @param completionFuture 用于通知操作完成的CompletableFuture
* @param splitAlgorithm 用于确定分裂点的算法
*
* 主要流程:
* 1. 使用指定算法获取分裂边界点
* 2. 根据边界点将原束分裂成两个新束
* 3. 尝试获取新束的所有权
* 4. 更新命名空间的束信息到元数据存储
* 5. 如果更新成功,则禁用原束并完成操作
* 6. 如果更新失败且是版本冲突导致的,则重试
* 7. 根据unload参数决定是否卸载新束
*
* 重试机制:
* - 最多重试 BUNDLE_SPLIT_RETRY_LIMIT (7) 次
* - 仅在遇到 MetadataStoreException.BadVersionException 时重试
* - 其他异常直接完成completionFuture并报告错误
*
* 异常处理:
* - IllegalArgumentException: 直接完成completionFuture并报告错误
* - BadVersionException: 重试直到达到最大重试次数
* - 其他异常: 完成completionFuture并报告ServiceUnitNotReadyException
*/
void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
boolean unload,
AtomicInteger counter,
CompletableFuture<Void> completionFuture,
NamespaceBundleSplitAlgorithm splitAlgorithm) {
// 使用指定算法获取分裂边界点
splitAlgorithm.getSplitBoundary(this, bundle).whenComplete((splitBoundary, ex) -> {
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
if (ex == null) {
try {
// 根据边界点将原束分裂成两个新束
bundleFactory.splitBundles(bundle,
2/* 默认分裂成2个 */, splitBoundary)
.thenAccept(splittedBundles -> {
// 分裂并更新命名空间束信息,更新可能因Zookeeper并发写入而失败
if (splittedBundles == null) {
String msg = format("bundle %s not found under namespace", bundle.toString());
LOG.warn(msg);
updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
return;
}
// 。。。
}
不好意思各位,真正的核心中的核心分裂逻辑在这里,后面真没有了。
/**
* 将指定的命名空间束(NamespaceBundle)分裂成多个较小的束
*
* 该方法根据给定的分裂点或数量将一个命名空间束分割成多个新的束,并返回包含新束集合的NamespaceBundles对象。
*
* @param targetBundle 需要被分裂的目标命名空间束
* @param argNumBundles 指定分裂成的束数量(当splitBoundary为null时使用)
* @param splitBoundary 指定的分裂边界点(可选,如果提供则将束分裂成两个部分)
* @return CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>>
* 返回包含更新后的NamespaceBundles和新分裂出的NamespaceBundle列表的异步结果
*
* 主要流程:
* 1. 验证目标束是否可以被分裂(必须有足够大的范围)
* 2. 如果提供了分裂边界点,则验证该点在束范围内,并将分裂数量设为2
* 3. 从缓存中获取命名空间当前的束配置
* 4. 在目标束的位置插入新的分割点,创建新的分区数组
* 5. 根据是否提供分裂边界点计算分割大小
* 6. 创建新的NamespaceBundles对象和分裂出的束列表
*
* 特殊处理:
* - 如果splitBoundary不为null,则忽略argNumBundles参数,强制分裂成2个束
* - 新创建的NamespaceBundles会保持原始束的版本信息
* - 只返回新分裂出的束列表,不包括原有的其他束
*/
public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> splitBundles(
NamespaceBundle targetBundle, int argNumBundles, Long splitBoundary) {
// 验证目标束是否可以进一步分裂
checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split further", targetBundle);
// 如果提供了分裂边界点,验证其有效性并设置分裂数量为2
if (splitBoundary != null) {
checkArgument(splitBoundary > targetBundle.getLowerEndpoint()
&& splitBoundary < targetBundle.getUpperEndpoint(),
"The given fixed key must between the key range of the %s bundle", targetBundle);
argNumBundles = 2;
}
// 参数验证
checkNotNull(targetBundle, "can't split null bundle");
checkNotNull(targetBundle.getNamespaceObject(), "namespace must be present");
NamespaceName nsname = targetBundle.getNamespaceObject();
finalint numBundles = argNumBundles;
// 从缓存中获取命名空间当前的束配置并进行分裂操作
return bundlesCache.get(nsname).thenApply(sourceBundle -> {
// 创建新的分区数组,长度为原数组长度加上新增的分区数减1
finalint lastIndex = sourceBundle.partitions.length - 1;
finallong[] partitions = newlong[sourceBundle.partitions.length + (numBundles - 1)];
int pos = 0;
int splitPartition = -1;
final Range<Long> range = targetBundle.getKeyRange();
// 遍历原分区数组,找到目标束并进行分裂
for (int i = 0; i < lastIndex; i++) {
// 判断当前分区是否为目标束的起始分区
if (sourceBundle.partitions[i] == range.lowerEndpoint()
&& (range.upperEndpoint() == sourceBundle.partitions[i + 1])) {
splitPartition = i;
long maxVal = sourceBundle.partitions[i + 1];
long minVal = sourceBundle.partitions[i];
// 计算每个新束的大小
long segSize = splitBoundary == null ? (maxVal - minVal) / numBundles : splitBoundary - minVal;
// 插入新的分区点
partitions[pos++] = minVal;
long curPartition = minVal + segSize;
for (int j = 0; j < numBundles - 1; j++) {
partitions[pos++] = curPartition;
curPartition += segSize;
}
} else {
// 非目标束直接复制
partitions[pos++] = sourceBundle.partitions[i];
}
}
// 设置最后一个分区点
partitions[pos] = sourceBundle.partitions[lastIndex];
// 如果成功找到并分裂了目标束
if (splitPartition != -1) {
// 创建新的NamespaceBundles对象,保持原始版本信息
NamespaceBundles splitNsBundles =
new NamespaceBundles(nsname, this, sourceBundle.getLocalPolicies(), partitions);
// 获取新分裂出的束列表
List<NamespaceBundle> splitBundles = splitNsBundles.getBundles().subList(splitPartition,
(splitPartition + numBundles));
// 返回新的NamespaceBundles和分裂出的束列表
returnnew ImmutablePair<>(splitNsBundles, splitBundles);
}
returnnull;
});
}
4.3 Bundle卸载流程

主要看 handleUnloadRequest 方法逻辑:

这段源码有几个细节老周想强调一下:
1、使用tryLock避免死锁
这样设计有几个好处:
2、本地设置标志 isActive 用的 AtomicIntegerFieldUpdater
private static final AtomicIntegerFieldUpdater<OwnedBundle> IS_ACTIVE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(OwnedBundle.class, "isActive");
private volatile int isActive = TRUE;
你用肯呢个会问,不能直接对isActive设置标识码?为啥要用AtomicIntegerFieldUpdater原子类来更新isActive属性?这是因为volatile只能保证内存可见性,不能保证原子性。
所以这里用AtomicIntegerFieldUpdater.compareAndSet 方法提供了原子性的比较和设置操作,确保在多线程环境下状态变更的安全性。