BlockManagerMasterEndpoint — BlockManagerMaster RPC Endpoint

BlockManagerMasterEndpoint is the RpcEndpoint for BlockManagerMaster to track status of the BlockManagers on the executors for a Spark application.

Note
It is used to register the BlockManagerMaster RPC endpoint when creating SparkEnv.
Table 1. BlockManagerMasterEndpoint Internal Registries and Counters
Name Description

blockLocations

Collection of BlockIds and their locations (as BlockManagerId).

Used in removeRdd to remove blocks for a RDD, removeBlockManager to remove blocks after a BlockManager gets removed, removeBlockFromWorkers, updateBlockInfo, and getLocations.

Tip

Enable INFO logging level for org.apache.spark.storage.BlockManagerMasterEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockManagerMasterEndpoint=INFO

Refer to Logging.

Removing Shuffle Blocks — removeShuffle Internal Method

Caution
FIXME

UpdateBlockInfo

class UpdateBlockInfo(
  var blockManagerId: BlockManagerId,
  var blockId: BlockId,
  var storageLevel: StorageLevel,
  var memSize: Long,
  var diskSize: Long)
Caution
FIXME

RemoveExecutor

RemoveExecutor(execId: String)

When RemoveExecutor is received, executor execId is removed and the response true sent back.

Note
RemoveExecutor is posted when BlockManagerMaster removes an executor.

BlockManagerHeartbeat

Caution
FIXME

GetLocations

GetLocations(blockId: BlockId)

When GetLocations comes in, the internal getLocations method is executed and the result becomes the response sent back.

Note
GetLocations is used to get the block locations of a single block.

RegisterBlockManager Event

RegisterBlockManager(
  blockManagerId: BlockManagerId,
  maxMemSize: Long,
  sender: RpcEndpointRef)

When RegisterBlockManager is received, the internal register method is executed.

Note
RegisterBlockManager is posted when BlockManagerMaster registers a BlockManager.

register Method

register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit

register records the current time and registers BlockManager by id if it has not been already registered (using the internal blockManagerInfo registry).

Note
register is executed when RegisterBlockManager has been received.

Registering a BlockManager can only happen once for an executor (identified by BlockManagerId.executorId using the internal blockManagerIdByExecutor registry).

If another BlockManager has earlier been registered for the executor, you should see the following ERROR message in the logs:

ERROR Got two different block manager registrations on same executor - will replace old one [oldId] with new one [id]

You should see the following INFO message in the logs:

INFO Registering block manager [hostPort] with [bytes] RAM, [id]

The BlockManager is recorded in the internal registries: blockManagerIdByExecutor and blockManagerInfo.

Caution
FIXME Why does blockManagerInfo require a new System.currentTimeMillis() since time was already recorded?

In either case, SparkListenerBlockManagerAdded is posted (to listenerBus).

Note
The method can only be executed on the driver where listenerBus is available.
Caution
FIXME Describe listenerBus + omnigraffle it.

Other RPC Messages

  • GetLocationsMultipleBlockIds

  • GetPeers

  • GetRpcHostPortForExecutor

  • GetMemoryStatus

  • GetStorageStatus

  • GetBlockStatus

  • GetMatchingBlockIds

  • RemoveRdd

  • RemoveShuffle

  • RemoveBroadcast

  • RemoveBlock

  • StopBlockManagerMaster

  • BlockManagerHeartbeat

  • HasCachedBlocks

Removing Executor — removeExecutor Method

removeExecutor(execId: String)

removeExecutor prints the following INFO message to the logs:

INFO BlockManagerMasterEndpoint: Trying to remove executor [execId] from BlockManagerMaster.

If the execId executor is found in the internal blockManagerIdByExecutor registry, the BlockManager for the executor is removed.

Note
removeExecutor is executed when BlockManagerMasterEndpoint receives a RemoveExecutor or registers a new BlockManager (and another BlockManager was already registered that is replaced by the new one).

Removing BlockManager — removeBlockManager Method

removeBlockManager(blockManagerId: BlockManagerId)

removeBlockManager looks up blockManagerId and removes the executor it was working on from the internal blockManagerIdByExecutor as well as from blockManagerInfo.

Note
removeBlockManager is a private helper method that is exclusively used while removing an executor.

It then goes over all the blocks for the BlockManager, and removes the executor for each block from blockLocations registry.

You should then see the following INFO message in the logs:

INFO BlockManagerMasterEndpoint: Removing block manager [blockManagerId]

Get Block Locations — getLocations Method

getLocations(blockId: BlockId): Seq[BlockManagerId]

When executed, getLocations looks up blockId in the blockLocations internal registry and returns the locations (as a collection of BlockManagerId) or an empty collection.

results matching ""

    No results matching ""