BlockManager是spark的存储子系统,spark涉及的RDD数据,shuffle数据,BroadCast广播变量等都是依托BlockManager来存取的。spark中的数据都以block的形式存在,block可以在BlockManager之间进行复制和同步。BlockManager之间通过spark的网络基础NettyRPC互通block元数据和数据的有无。
BlockManager也是一个主从架构,Driver端是Master,Executor端是Worker端。Executor端的BlockManager把自己注册到Driver,这样driver就有了所有的Executor端的BlockManager的信息,包括地址和内存等资源状态。Executor端的BlockManager也会把自己管理的所有Block信息上报给Driver端的BlockManager,这样Driver就提供了一个查询所有Block的元数据的服务。
而Driver端的BlockManager可以控制Executor端的BlockManager去做一些存储相关的事情,如RemoveBlock、RemoveRdd、RemoveShuffle、DecommissionBlockManager、RemoveBroadcast、GetMatchingBlockIds、ReplicateBlock等等。上面ReplicateBlock请求的处理逻辑是向Peer BlockManager同步Block数据,换句话说,BlockManager之间可以互相通信,用于Block的复制,或上传下载。
BlockManager在SparkEnv实例化的时候,作为SparkEnv的属性被初始化。SparkEnv本身在Driver和Executor端都会存在,因此BlockManager自然也存在Driver和Executor端,不过BlockManager在Driver和Executor端的角色不同,Driver端是用来收集、维护和提供查询全局BlockManager和Block信息的Master端,而Executor端的BlockManager关注Executor维护的BlockData的维护和读写支持,当然也会注册BlockManager自身信息BlockManagerInfo到Driver端的BlockManager。Executor端的BlockManager类似于HDFS系统的DataNode角色(支持block的读写和replication,block的状态上报等等)。
BlockManager启动过程中,通过RpcEnv实例化BlockManager的网络服务,以供其他Blockamanger进行访问。
BlockManager类作为整个spark存储系统的访问入口,提供了几乎所有数据存储和访问的功能,其中最重要的有三种数据:
从交互动作来看,Executor端的BlockManager功能包括:
BlockManager对外提供访问入口,实际的工作由底层各个组件完成,这些组件包括:
这些底层组件,大部分在SparkEnv构造BlockManager的时候作为构造函数参数给到BlockManager:
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores)
val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
conf.get(BLOCK_MANAGER_PORT)
}
val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
Some(new ExternalBlockStoreClient(transConf, securityManager,
securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)))
} else {
None
}
// Mapping from block manager id to the block manager's information.
val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
val blockManagerMaster = new BlockManagerMaster(
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(
rpcEnv,
isLocal,
conf,
listenerBus,
if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) {
externalShuffleClient
} else {
None
}, blockManagerInfo,
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])),
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),
conf,
isDriver)
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(
executorId,
rpcEnv,
blockManagerMaster,
serializerManager,
conf,
memoryManager,
mapOutputTracker,
shuffleManager,
blockTransferService,
securityManager,
externalShuffleClient)
下面挑几个重点组件讲下。
BlockManager里涉及到Rpc处理逻辑的主要是上面组件清单里的第二和第十点。
BlockManagerMasterEndpoint接收所有Executor端BlockManager和SparkContext的请求,详见代码:
### org.apache.spark.storage.BlockManagerMasterEndpoint#receiveAndReply
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
#Executor端BlockManager初始化或重新上线时调用
case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint) =>
context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint))
#Executor端BlockManager更新了Block数据时上报给Master
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
context.reply(isSuccess)
// SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
// returns false since the block info would be updated again later.
if (isSuccess) {
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
}
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
case GetLocationsAndStatus(blockId, requesterHost) =>
context.reply(getLocationsAndStatus(blockId, requesterHost))
case GetLocationsMultipleBlockIds(blockIds) =>
context.reply(getLocationsMultipleBlockIds(blockIds))
case GetPeers(blockManagerId) =>
context.reply(getPeers(blockManagerId))
case GetExecutorEndpointRef(executorId) =>
context.reply(getExecutorEndpointRef(executorId))
case GetMemoryStatus =>
context.reply(memoryStatus)
case GetStorageStatus =>
context.reply(storageStatus)
case GetBlockStatus(blockId, askStorageEndpoints) =>
context.reply(blockStatus(blockId, askStorageEndpoints))
case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) =>
context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
case RemoveShufflePushMergerLocation(host) =>
context.reply(removeShufflePushMergerLocation(host))
case IsExecutorAlive(executorId) =>
context.reply(blockManagerIdByExecutor.contains(executorId))
case GetMatchingBlockIds(filter, askStorageEndpoints) =>
context.reply(getMatchingBlockIds(filter, askStorageEndpoints))
#SparkContext.unpersistRDD()接口内部调用过来
case RemoveRdd(rddId) =>
context.reply(removeRdd(rddId))
case RemoveShuffle(shuffleId) =>
context.reply(removeShuffle(shuffleId))
case RemoveBroadcast(broadcastId, removeFromDriver) =>
context.reply(removeBroadcast(broadcastId, removeFromDriver))
case RemoveBlock(blockId) =>
removeBlockFromWorkers(blockId)
context.reply(true)
case RemoveExecutor(execId) =>
removeExecutor(execId)
context.reply(true)
case DecommissionBlockManagers(executorIds) =>
// Mark corresponding BlockManagers as being decommissioning by adding them to
// decommissioningBlockManagerSet, so they won't be used to replicate or migrate blocks.
// Note that BlockManagerStorageEndpoint will be notified about decommissioning when the
// executor is notified(see BlockManager.decommissionSelf), so we don't need to send the
// notification here.
val bms = executorIds.flatMap(blockManagerIdByExecutor.get)
logInfo(s"Mark BlockManagers (${bms.mkString(", ")}) as being decommissioning.")
decommissioningBlockManagerSet ++= bms
context.reply(true)
case GetReplicateInfoForRDDBlocks(blockManagerId) =>
context.reply(getReplicateInfoForRDDBlocks(blockManagerId))
case StopBlockManagerMaster =>
context.reply(true)
stop()
}
继续跟进代码可以发现,很多Block相关业务处理如RemoveRDD最终都分别调用了各个Executor端的BlockManagerStorageEndpoint对应服务。以RemoveRDD的处理为例
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
// First remove the metadata for the given RDD, and then asynchronously remove the blocks
// from the storage endpoints.
// The message sent to the storage endpoints to remove the RDD
val removeMsg = RemoveRdd(rddId)
// Find all blocks for the given RDD, remove the block from both blockLocations and
// the blockManagerInfo that is tracking the blocks and create the futures which asynchronously
// remove the blocks from storage endpoints and gives back the number of removed blocks
val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
val blocksToDeleteByShuffleService =
new mutable.HashMap[BlockManagerId, mutable.HashSet[RDDBlockId]]
blocks.foreach { blockId =>
val bms: mutable.HashSet[BlockManagerId] = blockLocations.remove(blockId)
#把RDD的Block location分成两组:内置的Blockmanager或外挂的ShuffleService
val (bmIdsExtShuffle, bmIdsExecutor) = bms.partition(_.port == externalShuffleServicePort)
val liveExecutorsForBlock = bmIdsExecutor.map(_.executorId).toSet
bmIdsExtShuffle.foreach { bmIdForShuffleService =>
// if the original executor is already released then delete this disk block via
// the external shuffle service
if (!liveExecutorsForBlock.contains(bmIdForShuffleService.executorId)) {
val blockIdsToDel = blocksToDeleteByShuffleService.getOrElseUpdate(bmIdForShuffleService,
new mutable.HashSet[RDDBlockId]())
blockIdsToDel += blockId
blockStatusByShuffleService.get(bmIdForShuffleService).foreach { blockStatus =>
blockStatus.remove(blockId)
}
}
}
#删除内存维护的数据结构
bmIdsExecutor.foreach { bmId =>
blockManagerInfo.get(bmId).foreach { bmInfo =>
bmInfo.removeBlock(blockId)
}
}
}
#调用Executor端的endpoint删除数据
val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
bmInfo.storageEndpoint.ask[Int](removeMsg).recover {
// use 0 as default value means no blocks were removed
handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
}
}.toSeq
#调用外部SHuffleService删除Block数据
val removeRddBlockViaExtShuffleServiceFutures = externalBlockStoreClient.map { shuffleClient =>
blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
Future[Int] {
val numRemovedBlocks = shuffleClient.removeBlocks(
bmId.host,
bmId.port,
bmId.executorId,
blockIds.map(_.toString).toArray)
numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS)
}
}
}.getOrElse(Seq.empty)
}
DiskStore:面向Block提供读取和写入的接口。
写:根据blockId参数,用DiskBlockManager确定Block对应的磁盘文件,然后用自定义的写入参数writeFunc来写数据。写完数据,更新blockSizes等内存数据结构
读:采用懒加载的方式完成,先根据BlockId,确定文件路径和blockSize,然后利用file和blocksize构造一个DiskBlockData或EncryptedBlockData对象,并没有做具体的数据读操作,等到后续使用的时候会通过toChunkedByteBuffer等方法完成。
# org.apache.spark.storage.DiskStore
def getBytes(blockId: BlockId): BlockData = {
getBytes(diskManager.getFile(blockId.name), getSize(blockId))
}
def getBytes(f: File, blockSize: Long): BlockData = securityManager.getIOEncryptionKey() match {
case Some(key) =>
// Encrypted blocks cannot be memory mapped; return a special object that does decryption
// and provides InputStream / FileRegion implementations for reading the data.
new EncryptedBlockData(f, blockSize, conf, key)
case _ =>
new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, f, blockSize)
}
DiskBlockManager不操作block数据,只维护某个Executor的localDirs,以及负责BlockId到文件路径的映射关系。
MemoryStore面向Block提供读取和写入接口,功能层面上同DiskStore。
MemoryManager负责堆内存进行管理,包括内存区域划分,不同区域的内存申请和释放。MemoryManager借助MemoryPool完成对单个内存区域的管理。其中内存区域分为:
MemoryManager就是利用上述四个区域的MemoryPool,实现在各个区域里进行内存空间申请和释放的逻辑。
onHeapStorageMemoryPool和onHeapExecutionMemoryPool之间在内存申请不够时,可以相互borrow空间。
offHeapStorageMemoryPool和offHeapExecutionMemoryPool同理。
Storage空间可以无条件向Execution空间借用free的memory,但是Executio空间只能向storage空间借用StorageRegionSize以外的空间。
Storage在借完空间后,如果还不够,则会启用block淘汰逻辑,淘汰掉非此次申请空间的block所在的RDD的其他block。淘汰只是说从内存丢掉,可能是直接在BlockManager里抹掉被淘汰的block,也可能只是将该block放到diskStore去,取决于该Block的StorageLevel设置。
下面以OnHeapStorageMemory空间的内存申请为例介绍其实现
思路:
#org.apache.spark.memory.UnifiedMemoryManager#acquireStorageMemory
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
assertInvariants()
assert(numBytes >= 0)
#确定内存相关参与区域
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON\_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF\_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapStorageMemory)
}
#maxMemory指除了Execution已用空间外的所有OnHeap/OffHeap空间。
#换句话说,除了Execution已用的空间不能动,其他所有空间都是可以考虑淘汰替换的。
if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxMemory bytes)")
return false
}
#向execution区域borrow
if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
numBytes - storagePool.memoryFree)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
#在storage区域内部进行分配,里面可能涉及block 淘汰
storagePool.acquireMemory(blockId, numBytes)
}
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long): Boolean = lock.synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
if (numBytesToFree > 0) {
#选块淘汰
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
// should have been updated.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
\_memoryUsed += numBytesToAcquire
}
enoughMemory
}
下面看MemoryStore。
数据结构:
MemoryStore的几个重要方法,也是对外提供的API:
def getBytes(blockId: BlockId): Option[ChunkedByteBuffer]
def getValues(blockId: BlockId): Option[Iterator[_]]
private def putIterator[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T],
memoryMode: MemoryMode,
valuesHolder: ValuesHolder[T]): Either[Long, Long]
#逻辑简单,直接利用参数_bytes构造一个MemoryEntry,然后放到entries数据结构
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean
下面简单分析putIterator,思路:
具体代码:
# org.apache.spark.storage.memory.MemoryStore#putIterator
private def putIterator[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T],
memoryMode: MemoryMode,
valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory currently reserved by this task for this particular unrolling operation
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
// Request enough memory to begin unrolling
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
unrollMemoryUsedByThisBlock += initialMemoryThreshold
}
// Unroll this block safely, checking whether we have exceeded our threshold periodically
while (values.hasNext && keepUnrolling) {
valuesHolder.storeValue(values.next())
if (elementsUnrolled % memoryCheckPeriod == 0) {
val currentSize = valuesHolder.estimatedSize()
// If our vector's size has exceeded the threshold, request more memory
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
}
elementsUnrolled += 1
}
// Make sure that we have enough memory to store the block. By this point, it is possible that
// the block's actual memory usage has exceeded the unroll memory by a small amount, so we
// perform one final call to attempt to allocate additional memory if necessary.
if (keepUnrolling) {
val entryBuilder = valuesHolder.getBuilder()
val size = entryBuilder.preciseSize
if (size > unrollMemoryUsedByThisBlock) {
val amountToRequest = size - unrollMemoryUsedByThisBlock
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
}
if (keepUnrolling) {
val entry = entryBuilder.build()
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
assert(success, "transferring unroll memory to storage memory failed")
}
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(entry.size)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
Left(unrollMemoryUsedByThisBlock)
}
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
Left(unrollMemoryUsedByThisBlock)
}
}
二者都实现了BlockStoreClient,都支持向远端push或upload block数据,从远端fetchBlock。
ExternalBlockStoreClient,主要用于Shuffle数据传输。class注释:
Client for reading both RDD blocks and shuffle blocks which points to an external
(outside of executor) server. This is instead of reading blocks directly from other executors
(via BlockTransferService), which has the downside of losing the data if we lose the executors.
MapOutputTracker是主从架构,分为MapOutputTrackerMaster和MapOutputTrackerWorker。
MapOutputTrackerMaster运行在Driver端,DAGSchduler向它注册shuffle的Map output status在数据。ShuffleMapStage用它跟踪Map输出状态,进而决定哪些作业要重跑。
/**
* Driver-side class that keeps track of the location of the map output of a stage.
*
* The DAGScheduler uses this class to (de)register map output statuses and to look up statistics
* for performing locality-aware reduce task scheduling.
*
* ShuffleMapStage uses this class for tracking available / missing outputs in order to determine
* which tasks need to be run.
*/
除了本地方法调用,master还对worker提供了查询功能,其处理逻辑在独立的messgeloop线程里完成:
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
val data = mapOutputRequests.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
mapOutputRequests.offer(PoisonPill)
return
}
val context = data.context
val shuffleId = data.shuffleId
val hostPort = context.senderAddress.hostPort
logDebug("Handling request to send map output locations for shuffle " + shuffleId +
" to " + hostPort)
val shuffleStatus = shuffleStatuses.get(shuffleId).head
context.reply(
shuffleStatus.serializedMapStatus(broadcastManager, isLocal, minSizeForBroadcast,
conf))
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
MapOutputTrackerWorker运行在Executor端,是Master的client,单一作用就是用来查询Master端维护的Shuffle Map输出数据的状态。
/**
* Executor-side client for fetching map output info from the driver's MapOutputTrackerMaster.
* Note that this is not used in local-mode; instead, local-mode Executors access the
* MapOutputTrackerMaster directly (which is possible because the master and worker share a common
* superclass).
*/
MapOutputTrackerWorker的主要方法:
#org.apache.spark.MapOutputTrackerWorker#getMapSizesByExecutorId
#向master查询给定shuffle的某几个map给指定范围的reduce的block信息
override def getMapSizesByExecutorId(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId")
val statuses = getStatuses(shuffleId, conf)
try {
val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex
logDebug(s"Convert map statuses for shuffle $shuffleId, " +
s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition")
MapOutputTracker.convertMapStatuses(
shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex)
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
mapStatuses.clear()
throw e
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。