log4j.logger.org.apache.spark.storage.BlockManagerMaster=INFO
BlockManagerMaster — BlockManager for Driver
BlockManagerMaster
runs on the driver and executors.
BlockManagerMaster
uses BlockManagerMasterEndpoint registered under BlockManagerMaster
RPC endpoint name on the driver (with the endpoint references on executors) to allow executors for sending block status updates to it and hence keep track of block statuses.
Note
|
An instance of BlockManagerMaster is created in SparkEnv (for the driver and executors), and immediately used to create their BlockManagers.
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating BlockManagerMaster
Instance
An instance of BlockManagerMaster
requires a BlockManagerMaster RPC endpoint reference, SparkConf, and the isDriver
flag to control whether it is created for the driver or executors.
Note
|
An instance of BlockManagerMaster is created as part of creating an instance of SparkEnv for the driver and executors.
|
Removing Executor — removeExecutor
Method
removeExecutor(execId: String): Unit
removeExecutor
posts RemoveExecutor
to BlockManagerMaster
RPC endpoint and waits for a response.
If false
in response comes in, a SparkException
is thrown with the following message:
BlockManagerMasterEndpoint returned false, expected true.
If all goes fine, you should see the following INFO message in the logs:
INFO BlockManagerMaster: Removed executor [execId]
Note
|
removeExecutor is executed when DAGScheduler processes ExecutorLost event.
|
Removing Block — removeBlock
Method
removeBlock(blockId: BlockId): Unit
removeBlock
simply posts a RemoveBlock
blocking message to BlockManagerMaster RPC endpoint (and ultimately disregards the reponse).
Removing RDD Blocks — removeRdd
Method
removeRdd(rddId: Int, blocking: Boolean)
removeRdd
removes all the blocks of rddId
RDD, possibly in a blocking
fashion.
It posts a RemoveRdd(rddId)
message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
WARN Failed to remove RDD [rddId] - [exception]
If it is a blocking
operation, it waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
Removing Shuffle Blocks — removeShuffle
Method
removeShuffle(shuffleId: Int, blocking: Boolean)
removeShuffle
removes all the blocks of shuffleId
shuffle, possibly in a blocking
fashion.
It posts a RemoveShuffle(shuffleId)
message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
WARN Failed to remove shuffle [shuffleId] - [exception]
If it is a blocking
operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
Note
|
removeShuffle is used exclusively when ContextCleaner removes a shuffle.
|
Removing Broadcast Blocks — removeBroadcast
Method
removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean)
removeBroadcast
removes all the blocks of broadcastId
broadcast, possibly in a blocking
fashion.
It posts a RemoveBroadcast(broadcastId, removeFromMaster)
message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
WARN Failed to remove broadcast [broadcastId] with removeFromMaster = [removeFromMaster] - [exception]
If it is a blocking
operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
Stopping BlockManagerMaster — stop
Method
stop(): Unit
stop
sends a StopBlockManagerMaster
message to BlockManagerMaster RPC endpoint and waits for a response.
Note
|
It is only executed for the driver. |
If all goes fine, you should see the following INFO message in the logs:
INFO BlockManagerMaster: BlockManagerMaster stopped
Otherwise, a SparkException
is thrown.
BlockManagerMasterEndpoint returned false, expected true.
Registering BlockManager to Driver — registerBlockManager
Method
registerBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId
When called, registerBlockManager
prints the following INFO message to the logs:
INFO BlockManagerMaster: Registering BlockManager [blockManagerId]

registerBlockManager
then informs the driver about the blockManagerId
BlockManager registered by posting a blocking RegisterBlockManager
message to BlockManagerMaster RPC endpoint. It waits until a confirmation comes.
You should see the following INFO message in the logs:
INFO BlockManagerMaster: Registered BlockManager [updatedId]
And updatedId
is returned.
Note
|
registerBlockManager is called while BlockManager initializes (on the driver or executors) or re-registers blocks with the driver.
|
Sending UpdateBlockInfo
to Driver — updateBlockInfo
Method
updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean
updateBlockInfo
sends a blocking UpdateBlockInfo
message to BlockManagerMaster RPC endpoint and waits for a response.
You should see the following DEBUG message in the logs:
DEBUG BlockManagerMaster: Updated info of block [blockId]
updateBlockInfo
returns the response from the BlockManagerMaster
RPC endpoint.
Get Block Locations of One Block — getLocations
Method
getLocations(blockId: BlockId): Seq[BlockManagerId]
getLocations
posts GetLocations(blockId)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
Get Block Locations for Multiple Blocks — getLocations
Method
getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]]
getLocations
posts GetLocationsMultipleBlockIds(blockIds)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getPeers
Method
getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId]
getPeers
posts GetPeers(blockManagerId)
message BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getExecutorEndpointRef
Method
getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef]
getExecutorEndpointRef
posts GetExecutorEndpointRef(executorId)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getMemoryStatus
Method
getMemoryStatus: Map[BlockManagerId, (Long, Long)]
getMemoryStatus
posts a GetMemoryStatus
message BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getStorageStatus
Method
getStorageStatus: Array[StorageStatus]
getStorageStatus
posts a GetStorageStatus
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getBlockStatus
Method
getBlockStatus(
blockId: BlockId,
askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus]
getBlockStatus
posts a GetBlockStatus(blockId, askSlaves)
message to BlockManagerMaster RPC endpoint and waits for a response (of type Map[BlockManagerId, Future[Option[BlockStatus]]]
).
It then builds a sequence of future results that are BlockStatus
statuses and waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
No result leads to a SparkException
with the following message:
BlockManager returned null for BlockStatus query: [blockId]
getMatchingBlockIds
Method
getMatchingBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Seq[BlockId]
getMatchingBlockIds
posts a GetMatchingBlockIds(filter, askSlaves)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
hasCachedBlocks
Method
hasCachedBlocks(executorId: String): Boolean
hasCachedBlocks
posts a HasCachedBlocks(executorId)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result.