log4j.logger.org.apache.spark.storage.BlockManagerMaster=INFO
BlockManagerMaster — BlockManager for Driver
BlockManagerMaster runs on the driver and executors.
BlockManagerMaster uses BlockManagerMasterEndpoint registered under BlockManagerMaster RPC endpoint name on the driver (with the endpoint references on executors) to allow executors for sending block status updates to it and hence keep track of block statuses.
|
Note
|
An instance of BlockManagerMaster is created in SparkEnv (for the driver and executors), and immediately used to create their BlockManagers.
|
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating BlockManagerMaster Instance
An instance of BlockManagerMaster requires a BlockManagerMaster RPC endpoint reference, SparkConf, and the isDriver flag to control whether it is created for the driver or executors.
|
Note
|
An instance of BlockManagerMaster is created as part of creating an instance of SparkEnv for the driver and executors.
|
Removing Executor — removeExecutor Method
removeExecutor(execId: String): Unit
removeExecutor posts RemoveExecutor to BlockManagerMaster RPC endpoint and waits for a response.
If false in response comes in, a SparkException is thrown with the following message:
BlockManagerMasterEndpoint returned false, expected true.
If all goes fine, you should see the following INFO message in the logs:
INFO BlockManagerMaster: Removed executor [execId]
|
Note
|
removeExecutor is executed when DAGScheduler processes ExecutorLost event.
|
Removing Block — removeBlock Method
removeBlock(blockId: BlockId): Unit
removeBlock simply posts a RemoveBlock blocking message to BlockManagerMaster RPC endpoint (and ultimately disregards the reponse).
Removing RDD Blocks — removeRdd Method
removeRdd(rddId: Int, blocking: Boolean)
removeRdd removes all the blocks of rddId RDD, possibly in a blocking fashion.
It posts a RemoveRdd(rddId) message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
WARN Failed to remove RDD [rddId] - [exception]
If it is a blocking operation, it waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.
Removing Shuffle Blocks — removeShuffle Method
removeShuffle(shuffleId: Int, blocking: Boolean)
removeShuffle removes all the blocks of shuffleId shuffle, possibly in a blocking fashion.
It posts a RemoveShuffle(shuffleId) message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
WARN Failed to remove shuffle [shuffleId] - [exception]
If it is a blocking operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.
|
Note
|
removeShuffle is used exclusively when ContextCleaner removes a shuffle.
|
Removing Broadcast Blocks — removeBroadcast Method
removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean)
removeBroadcast removes all the blocks of broadcastId broadcast, possibly in a blocking fashion.
It posts a RemoveBroadcast(broadcastId, removeFromMaster) message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
WARN Failed to remove broadcast [broadcastId] with removeFromMaster = [removeFromMaster] - [exception]
If it is a blocking operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.
Stopping BlockManagerMaster — stop Method
stop(): Unit
stop sends a StopBlockManagerMaster message to BlockManagerMaster RPC endpoint and waits for a response.
|
Note
|
It is only executed for the driver. |
If all goes fine, you should see the following INFO message in the logs:
INFO BlockManagerMaster: BlockManagerMaster stopped
Otherwise, a SparkException is thrown.
BlockManagerMasterEndpoint returned false, expected true.
Registering BlockManager to Driver — registerBlockManager Method
registerBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId
When called, registerBlockManager prints the following INFO message to the logs:
INFO BlockManagerMaster: Registering BlockManager [blockManagerId]
registerBlockManager then informs the driver about the blockManagerId BlockManager registered by posting a blocking RegisterBlockManager message to BlockManagerMaster RPC endpoint. It waits until a confirmation comes.
You should see the following INFO message in the logs:
INFO BlockManagerMaster: Registered BlockManager [updatedId]
And updatedId is returned.
|
Note
|
registerBlockManager is called while BlockManager initializes (on the driver or executors) or re-registers blocks with the driver.
|
Sending UpdateBlockInfo to Driver — updateBlockInfo Method
updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean
updateBlockInfo sends a blocking UpdateBlockInfo message to BlockManagerMaster RPC endpoint and waits for a response.
You should see the following DEBUG message in the logs:
DEBUG BlockManagerMaster: Updated info of block [blockId]
updateBlockInfo returns the response from the BlockManagerMaster RPC endpoint.
Get Block Locations of One Block — getLocations Method
getLocations(blockId: BlockId): Seq[BlockManagerId]
getLocations posts GetLocations(blockId) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
Get Block Locations for Multiple Blocks — getLocations Method
getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]]
getLocations posts GetLocationsMultipleBlockIds(blockIds) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getPeers Method
getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId]
getPeers posts GetPeers(blockManagerId) message BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getExecutorEndpointRef Method
getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef]
getExecutorEndpointRef posts GetExecutorEndpointRef(executorId) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getMemoryStatus Method
getMemoryStatus: Map[BlockManagerId, (Long, Long)]
getMemoryStatus posts a GetMemoryStatus message BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getStorageStatus Method
getStorageStatus: Array[StorageStatus]
getStorageStatus posts a GetStorageStatus message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getBlockStatus Method
getBlockStatus(
blockId: BlockId,
askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus]
getBlockStatus posts a GetBlockStatus(blockId, askSlaves) message to BlockManagerMaster RPC endpoint and waits for a response (of type Map[BlockManagerId, Future[Option[BlockStatus]]]).
It then builds a sequence of future results that are BlockStatus statuses and waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.
No result leads to a SparkException with the following message:
BlockManager returned null for BlockStatus query: [blockId]
getMatchingBlockIds Method
getMatchingBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Seq[BlockId]
getMatchingBlockIds posts a GetMatchingBlockIds(filter, askSlaves) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.
hasCachedBlocks Method
hasCachedBlocks(executorId: String): Boolean
hasCachedBlocks posts a HasCachedBlocks(executorId) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result.