CoarseGrainedSchedulerBackend

CoarseGrainedSchedulerBackend is a SchedulerBackend.

CoarseGrainedSchedulerBackend is an ExecutorAllocationClient.

CoarseGrainedSchedulerBackend is responsible for requesting resources from a cluster manager for executors that it in turn uses to launch tasks (on coarse-grained executors).

CoarseGrainedSchedulerBackend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.

Caution
FIXME Picture with dependencies

It registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.

Table 1. Built-In CoarseGrainedSchedulerBackends per Cluster Environment
Cluster Environment CoarseGrainedSchedulerBackend

Spark Standalone

StandaloneSchedulerBackend

Spark on YARN

YarnSchedulerBackend

Spark on Mesos

MesosCoarseGrainedSchedulerBackend

Note
CoarseGrainedSchedulerBackend is only created indirectly through built-in implementations per cluster environment.
Table 2. CoarseGrainedSchedulerBackend’s Internal Properties
Name Initial Value Description

maxRpcMessageSize

spark.rpc.message.maxSize but not greater than 2047

Maximum RPC message size in MB.

When above 2047 MB you should see the following IllegalArgumentException:

spark.rpc.message.maxSize should not be greater than 2047 MB

executorDataMap

empty

Registry of ExecutorData by executor id.

NOTE: ExecutorData holds an executor’s endpoint reference, address, host, the number of free and total CPU cores, the URL of execution logs.

Element added when DriverEndpoint receives RegisterExecutor message and removed when DriverEndpoint receives RemoveExecutor message or a remote host (with one or many executors) disconnects.

totalCoreCount

0

Total number of CPU cores, i.e. the sum of all the cores on all executors.

totalRegisteredExecutors

0

Total number of registered executors

driverEndpoint

(uninitialized)

CoarseGrainedSchedulerBackend RPC Endpoint

_minRegisteredRatio

spark.scheduler.minRegisteredResourcesRatio

maxRegisteredWaitingTimeMs

spark.scheduler.maxRegisteredResourcesWaitingTime

currentExecutorIdCounter

The last (highest) identifier of all allocated executors.

Used exclusively in YarnSchedulerEndpoint to respond to RetrieveLastAllocatedExecutorId message.

numPendingExecutors

0

executorsPendingToRemove

empty

Executors marked as removed but the confirmation from a cluster manager has not arrived yet.

hostToLocalTaskCount

empty

Registry of hostnames and possible number of task running on them.

createTime

Current time

The time CoarseGrainedSchedulerBackend was created.

localityAwareTasks

0

Number of pending tasks…​FIXME

defaultAskTimeout

spark.rpc.askTimeout or spark.network.timeout or 120s

Default timeout for blocking RPC messages (aka ask messages).

Tip

Enable INFO or DEBUG logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend=DEBUG

Refer to Logging.

Making Fake Resource Offers on Executors — makeOffers Internal Methods

makeOffers(): Unit
makeOffers(executorId: String): Unit

makeOffers takes the active executors (out of the executorDataMap internal registry) and creates WorkerOffer resource offers for each (one per executor with the executor’s id, host and free cores).

Caution
Only free cores are considered in making offers. Memory is not! Why?!

It then requests TaskSchedulerImpl to process the resource offers to create a collection of TaskDescription collections that it in turn uses to launch tasks.

Creating CoarseGrainedSchedulerBackend Instance

CoarseGrainedSchedulerBackend takes the following when created:

CoarseGrainedSchedulerBackend initializes the internal registries and counters.

Getting Executor Ids — getExecutorIds Method

When called, getExecutorIds simply returns executor ids from the internal executorDataMap registry.

Note
It is called when SparkContext calculates executor ids.

CoarseGrainedSchedulerBackend Contract

class CoarseGrainedSchedulerBackend {
  def minRegisteredRatio: Double
  def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint
  def reset(): Unit
  def sufficientResourcesRegistered(): Boolean
  def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean]
  def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
}
Note
CoarseGrainedSchedulerBackend is a private[spark] contract.
Table 3. FIXME Contract
Method Description

minRegisteredRatio

Ratio between 0 and 1 (inclusive).

Controlled by spark.scheduler.minRegisteredResourcesRatio.

createDriverEndpoint

FIXME

reset

FIXME

doRequestTotalExecutors

FIXME

doKillExecutors

FIXME

sufficientResourcesRegistered

Always positive, i.e. true, that means that sufficient resources are available.

Used when CoarseGrainedSchedulerBackend checks if sufficient compute resources are available.

numExistingExecutors Method

Caution
FIXME

killExecutors Methods

Caution
FIXME

applicationId Method

Caution
FIXME

getDriverLogUrls Method

Caution
FIXME

applicationAttemptId Method

Caution
FIXME

Requesting Additional Executors — requestExecutors Method

requestExecutors(numAdditionalExecutors: Int): Boolean

requestExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

Note
requestExecutors method is a part of ExecutorAllocationClient Contract that SparkContext uses for requesting additional executors (as a part of a developer API for dynamic allocation of executors).

When called, you should see the following INFO message followed by DEBUG message in the logs:

INFO Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
DEBUG Number of pending executors is now [numPendingExecutors]

numPendingExecutors is increased by the input numAdditionalExecutors.

requestExecutors requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal numExistingExecutors and numPendingExecutors decreased by the number of executors pending to be removed.

If numAdditionalExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!
Note
It is a final method that no other scheduler backends could customize further.
Note
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Requesting Exact Number of Executors — requestTotalExecutors Method

requestTotalExecutors(
  numExecutors: Int,
  localityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int]): Boolean

requestTotalExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

It sets the internal localityAwareTasks and hostToLocalTaskCount registries. It then calculates the exact number of executors which is the input numExecutors and the executors pending removal decreased by the number of already-assigned executors.

If numExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!
Note
It is a final method that no other scheduler backends could customize further.
Note
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Computing Default Level of Parallelism — defaultParallelism Method

The default parallelism is controlled by spark.default.parallelism or is at least 2 or totalCoreCount.

Note
defaultParallelism is part of the SchedulerBackend Contract.

Killing Task — killTask Method

killTask simply sends a KillTask message to driverEndpoint.

Caution
FIXME Image
Note
killTask is part of the SchedulerBackend Contract.

Stopping All Executors — stopExecutors Method

stopExecutors sends a blocking StopExecutors message to driverEndpoint (if already initialized).

Note
It is called exclusively while CoarseGrainedSchedulerBackend is being stopped.

You should see the following INFO message in the logs:

INFO CoarseGrainedSchedulerBackend: Shutting down all executors

Reset State — reset Method

reset resets the internal state:

  1. Sets numPendingExecutors to 0

  2. Clears executorsPendingToRemove

  3. Sends a blocking RemoveExecutor message to driverEndpoint for every executor (in the internal executorDataMap) to inform it about SlaveLost with the message:

    Stale executor after cluster manager re-registered.

reset is a method that is defined in CoarseGrainedSchedulerBackend, but used and overriden exclusively by YarnSchedulerBackend.

Remove Executor — removeExecutor Method

removeExecutor(executorId: String, reason: ExecutorLossReason)

removeExecutor sends a blocking RemoveExecutor message to driverEndpoint.

CoarseGrainedScheduler RPC Endpoint — driverEndpoint

When CoarseGrainedSchedulerBackend starts, it registers CoarseGrainedScheduler RPC endpoint to be the driver’s communication endpoint.

driverEndpoint is a DriverEndpoint.

Note
CoarseGrainedSchedulerBackend is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint (at least partially).

It is called standalone scheduler’s driver endpoint internally.

It tracks:

It uses driver-revive-thread daemon single-thread thread pool for …​FIXME

Caution
FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.

Starting CoarseGrainedSchedulerBackend (and Registering CoarseGrainedScheduler RPC Endpoint) — start Method

start(): Unit
Note
start is a part of the SchedulerBackend contract.

start takes all spark.-prefixed properties and registers the CoarseGrainedScheduler RPC endpoint (backed by DriverEndpoint ThreadSafeRpcEndpoint).

CoarseGrainedScheduler rpc endpoint.png
Figure 1. CoarseGrainedScheduler Endpoint
Note
start uses TaskSchedulerImpl to access the current SparkContext and in turn SparkConf.
Note
start uses RpcEnv that was given when CoarseGrainedSchedulerBackend was created.

Checking If Sufficient Compute Resources Available Or Waiting Time Passed — isReady Method

isReady(): Boolean
Note
isReady is a part of the SchedulerBackend contract.

isReady allows to delay task launching until sufficient resources are available or spark.scheduler.maxRegisteredResourcesWaitingTime passes.

Note
sufficientResourcesRegistered by default responds that sufficient resources are available.

If the resources are available, you should see the following INFO message in the logs and isReady is positive.

INFO SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]
Note
minRegisteredRatio is in the range 0 to 1 (uses spark.scheduler.minRegisteredResourcesRatio) to denote the minimum ratio of registered resources to total expected resources before submitting tasks.

If there are no sufficient resources available yet (the above requirement does not hold), isReady checks whether the time since startup passed spark.scheduler.maxRegisteredResourcesWaitingTime to give a way to launch tasks (even when minRegisteredRatio not being reached yet).

You should see the following INFO message in the logs and isReady is positive.

INFO SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)

Otherwise, when no sufficient resources are available and spark.scheduler.maxRegisteredResourcesWaitingTime has not elapsed, isReady is negative.

Reviving Resource Offers (by Posting ReviveOffers to CoarseGrainedSchedulerBackend RPC Endpoint) — reviveOffers Method

reviveOffers(): Unit
Note
reviveOffers is a part of the SchedulerBackend contract.

reviveOffers sends a ReviveOffers message to CoarseGrainedSchedulerBackend RPC endpoint.

CoarseGrainedExecutorBackend reviveOffers.png
Figure 2. CoarseGrainedExecutorBackend Revives Offers

Stopping CoarseGrainedSchedulerBackend (and Stopping Executors) — stop Method

stop(): Unit
Note
stop is a part of the SchedulerBackend contract.

stop stops all executors and CoarseGrainedScheduler RPC endpoint (by sending a blocking StopDriver message).

In case of any Exception, stop reports a SparkException with the message:

Error stopping standalone scheduler's driver endpoint

Settings

Table 4. Spark Properties
Spark Property Default Value Description

spark.scheduler.revive.interval

1s

Time (in milliseconds) between reviving offers.

spark.rpc.message.maxSize

128

Maximum message size (in MB) to allow in "control plane" communication; generally only applies to map output size (serialized) information sent between executors and the driver.

Increase this if you are running jobs with many thousands of map and reduce tasks and see messages about the RPC message size.

spark.scheduler.minRegisteredResourcesRatio

0

Double number between 0 and 1 (including) that controls the minimum ratio of (registered resources / total expected resources) before submitting tasks.

See isReady in this document.

spark.scheduler.maxRegisteredResourcesWaitingTime

30s

Time to wait for sufficient resources available.

See isReady in this document.

results matching ""

    No results matching ""