在Spark3.2中引入了领英设计的一种新的shuffle方案,今天我们先来了解下其大致的设计原理,之后会再分析其具体的代码实现。
当我们在Yarn上部署Spark时,通常会使用ESS来管理shuffle数据(具体可见什么是ESS的文章)。我们先来回顾下基于ESS进行shuffle的过程。
然而在实践中任然存在些问题,使得spark任务的稳定性不高。
1
PBS主要结构和流程:
几个重要的特性:
push-merge 的根本目的是减少reduce侧的随机IO, 在Magnet上把小文件block合并后, 将随机IO转变为顺序IO。reduce task可以读取连续存储的、大小在MB级别的文件。
为了解决map端的小文件问题,提高磁盘 I/O 效率,我们需要增加每次 I/O 操作的数据量。这里提出了采用合并属于同一个 Shuffle 分区的 Shuffle block 块,以创建更大的数据块的方式。
下面我们来详细解释下:
首先,push-merge的基本单位是chunk,map task输出block后,首先要将block以算法的方式分配到chunk中去。
这里的算法的简单思想就是将block块合并为chunk,当chunk的长度超过超限之后又push到magent上的过程。具体的实现在方法ShuffleBlockPusher.prepareBlockPushRequests方法中:
for (reduceId <- 0 until numPartitions) {
val blockSize = partitionLengths(reduceId)
if (blockSize > 0) {
// [1] 通过以下公式,更新一下merge service机器编号,把chunk发送到下一台机器上
val mergerId = math.min(math.floor(reduceId * 1.0 / numPartitions * numMergers),
numMergers - 1).asInstanceOf[Int]
// [2] 当chunk长度没有超过限制maxBlockSizeToPush,将block append到chunk中,更新chunk长度
// service, and does not go beyond existing limitations.
if (currentReqSize + blockSize <= maxBlockBatchSize
&& blocks.size < maxBlocksInFlightPerAddress
&& mergerId == currentMergerId && blockSize <= maxBlockSizeToPush) {
// Add current block to current batch
currentReqSize += blockSize.toInt
// [3] 当chunk长度超过限制,将chunk推到编号为currentMergerId的Magnet机器上,之后写入新的block进去(重新初始化)
} else {
if (blocks.nonEmpty) {
// Convert the previous batch into a PushRequest
requests +=PushRequest(mergerLocs(currentMergerId), blocks.toSeq,
createRequestBuffer(transportConf, dataFile, currentReqOffset, currentReqSize))
blocks = new ArrayBuffer[(BlockId, Int)]
}
// Start a new batch
currentReqSize = 0
// Set currentReqOffset to -1 so we are able to distinguish between the initial value
// of currentReqOffset and when we are about to start a new batch
currentReqOffset = -1
currentMergerId = mergerId
}
// push的blocks长度都是小于maxBlockSizeToPush
// Only push blocks under the size limit
if (blockSize <= maxBlockSizeToPush) {
val blockSizeInt = blockSize.toInt
blocks += ((ShufflePushBlockId(shuffleId, shuffleMergeId, partitionId,
reduceId), blockSizeInt))
// Only update currentReqOffset if the current block is the first in the request
if (currentReqOffset == -1) {
currentReqOffset = offset
}
if (currentReqSize == 0) {
currentReqSize += blockSizeInt
}
}
}
offset += blockSize
}
可见这里的算法的流程为:
同时需要注意这里push的blocks的大小都是小于maxBlockSizeToPush,这里用于跳过数据倾斜的分区块。
// Randomize the orders of the PushRequest, so different mappers pushing blocks at the same
// time won't be pushing the same ranges of shuffle partitions.
pushRequests ++= Utils.randomize(requests)
另外为了避免顺序的构造push chunk,导致Magnet上资源的热点和严重的争用冲突。在完成准备将shuffle data转换为push request后,将chunk按照编号进行了随机化处理,来避免所有map task按照相同次序push chunk。
在 Magnet shuffle service一侧,对于正在主动合并的每个 Shuffle 分区,Magnet shuffle service会生成合并的 Shuffle 文件,用来添加所有接收的相应 block 块。
它还为每个主动合并的 Shuffle 分区维护一些元数据。
这份元数据的唯一键由 applicationID,shuffle ID和 shuffle partition ID混合组成,并且放到一个 ConcurrentHashMap 中。
2
Magnet机器上需要维护一些重要的元信息,如上图所示,包括:
当Magnet shuffle service接收到 block 块时,在尝试添加到对应的 shuffle 合并文件之前,它首先要检索相应的 Shuffle 分区元数据。元数据可以帮助Magnet shuffle service正确处理一些潜在的异常场景。
例如,bitmap可帮助Magnet shuffle service识别任何潜在的重复块,因此没有多余的数据会被写入 Shuffle 合并文件中。
currentMapId用于保证当前正在append的block,即使Magnet shuffle service可以从不同的 Map 任务中接收同一个 shuffle 分区的多个 block 块,只有当currentMapId的 block 块完整地添加到 Shuffle 合并文件中,下一次写入才可开始。
并且,在遇到足以损坏整个 shuffle 合并文件的故障之前,可以将 block 块部分地添加到 Shuffle 合并文件中。当发生这种情况时,position offset会有助于将 Shuffle 合并文件带回到健康状态。下一个 block 块会从位置偏移量处开始添加,这可以有效地覆盖损坏的部分。如果损坏的 block 块是最后一个的话,block 合并操作结束之后将截断损坏的部分。通过追踪这份元数据,Magnet shuffle service可以在 block 块合并操作期间适宜地去处理重复,冲撞和故障的情况。
magnet shuffle服务通过Best-effort的方式来解决海量连接可靠性低的问题。在该体系上,所有连接异常都是non-fatal的,可以理解为每个环节上的连接断开或异常,都有一个对应的备选和兜底方案:
对于一个有着 M 个 Map 任务和 R 个 Reduce 任务的 Shuffle 来说,Spark Driver 会收集 M 个 MapStatus和 R 个 MergeStatus。
这些元数据会告诉 Spark Driver 每个未合并的 Shuffle block 块和已合并的 Shuffle 文件的位置和大小,还有哪些 block 块会合并到每一个 Shuffle 合并文件中。
因此,Spark Driver 可以完整的看到,怎样去获取每个 Reduce 任务已合并的Shuffle 文件和未合并的 Shuffle 块。当 Reduce 任务没能获取到 Shuffle 合并 block 块时,元数据便会能够回过头来获取原始的未合并的 block 块。
Magnet 尽最大可能有效地维护了两份 Shuffle 数据的副本。
Magnet允许 Spark原生地去管理 Shuffle 的各个方面,包括存储 Shuffle 数据,提供容错能力,还有可以追踪 Shuffle 数据的位置元数据信息。
在这种情况下,Spark 不依赖于外部的系统进行 Shuffle。
这允许灵活地将Magnet部署在计算/存储同一节点的 on-prem 集群和具有disaggregated storage layer的cloud-based的集群。对于计算和存储同一个节点的on prem数据中心,Shuffle Reduce 任务的数据本地性可以带来很多好处。
其中包括提高 I/O 效率,并且由于绕过网络传输减少了 Shuffle 获取失败的情况。
通过利用 Spark 的位置感知任务调度并且基于 Spark Executor 的位置信息选择 Magnet shuffle service来 push Shuffle block 块,实现 Shuffle 数据本地性似乎微不足道。
动态分配的功能使得 Spark 在一段时间内如果没有任务运行,则释放空闲的 Executor,并且如果任务再次待办,则可以稍后重新启动 Executor。
这使得 Spark 应用程序在多租户集群中资源更加富裕。
通过 Spark 动态分配,当 Driver 在 Shuffle Map Stage 的开头选择Magnet shuffle service列表时,由于 Executor 在前一个 Stage 的结尾会释放,活跃的 Spark Executor 的数量可能小于需求的数量。如果我们选择基于 Spark Executor 位置信息的 Magnet shuffle service,我们最终可能比需求的 Shuffle 服务更少。
为了解决这个问题,我们选择在活跃 Spark Executor 之外位置的Magnet shuffle service,并通过基于所选Magnet shuffle service位置信息的动态分配机制来启动 Spark Executor。这样的话,我们基于Magnet shuffle service的位置信息来启动 Spark Executor,而不会去基于 Spark Executor 的位置信息来选择 Magnet shuffle service。由于Magnet和 Spark 原生的 Shuffle 集成,因此可以进行这种优化。
对于cloud-based
的集群部署,计算和存储节点通常是分开的。
在这样的部署中, Shuffle 中间数据可以通过快速网络连接在disaggregated storage
中物化。
Shuffle Reduce 任务的数据本地性在这种设置中不再重要。然而,Magnet仍然适合这种cloud-based的部署。Magnet shuffle service在计算节点上运行,在 disaggregated storage 节点上面存储合并的 shuffle 文件。通过读取更大的数据 chunk 块而不是横跨网络的细碎的 shuffle block 块,Magnet有助于更好地利用可用网络带宽。
此外,Spark Executor 在选择Magnet shuffle service的时候可以选择优化更好的负载均衡而不是数据本地性。Spark Driver 可以查询可用Magnet shuffle service的负载,以便选择负载低的。在我们的Magnet实现中,我们允许通过灵活的政策来选择Magnet shuffle service的位置。
因此,我们可以选择根据集群的部署模式要么优化数据本地性要么优化负载均衡,或者两者都有也行。
当所有的 Map 任务在 Shuffle Map Stage 结尾完成的时候,Shuffle block 块推送操作可能还没有完全完成。
此时有一批 Map 任务刚刚开始推送 block 块,也可能有落后者做不到足够快地推送 block 块。不同于 Reduce 任务中的落后者,我们在 Shuffle Map Stage 结尾经历的任何延迟都将直接影响作业的运行时间。
为了缓解这样的落后,Magnet允许 Spark Driver 设置期望等待 block 块推送/合并操作的时间上限。
magnet服务设置了push-merge超时时间,如果block没有在超时时间内完成push-merge,magnet服务会停止继续接受block,提前让reduce task开始执行;而未完成push-merge的block,根据上面中提到的Best-effort方案,reduce task会从MapStatus中获取状态与位置信息,直接拉取没有merge的block数据。
然而,它确保Magnet可以提供 push/merge shuffle 的大部分益处,同时将落后者的负面影响限制在 Spark 应用程序的运行时间内。
在Spark shuffle过程中,如果某个partition的shuffle数据量远高于其他partition,则会出现数据倾斜(data skew)问题。 data skew 不是magnet特有的问题,而是在Spark上已经有成熟解决方案,即 AQE。
magnet需要适配Spark 的adaptive execution特性,同时防止一个magnet服务上因data skew而导致有 100GB / 1TB级别的数据需要merge。为此,针对上文的算法可以看出,push的blocks的大小都是小于maxBlockSizeToPush,通过限制 size超过阈值的block被并入到chunk中;如果超过阈值,则会利用上节中的Best-effort方案,直接拉取未完成merge的block数据。而普通的、未有data skew情况的block,则会走正常的push-merge流程。
服务器端配置(yarn-site.xml)
# 默认的push based shuffle是关闭的。如果需要开启请设置为:org.apache.spark.network.shuffle.RemoteBlockPushResolver。
spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager
# 在push-based shuffle期间将合并的shuffle文件划分为多个块时最小的大小,默认为2m。
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile=2m
# 缓存大小,可以存储合并的索引文件
spark.shuffle.push.server.mergedIndexCacheSize=100m
客户端配置
3
接下来我们将从源码的角度进行进一步的分析。