spark.rpc.message.maxSize should not be greater than 2047 MB
CoarseGrainedSchedulerBackend
CoarseGrainedSchedulerBackend is a SchedulerBackend.
CoarseGrainedSchedulerBackend is an ExecutorAllocationClient.
CoarseGrainedSchedulerBackend is responsible for requesting resources from a cluster manager for executors that it in turn uses to launch tasks (on coarse-grained executors).
CoarseGrainedSchedulerBackend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.
|
Caution
|
FIXME Picture with dependencies |
It registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.
| Cluster Environment | CoarseGrainedSchedulerBackend |
|---|---|
Spark Standalone |
|
Spark on YARN |
|
Spark on Mesos |
|
Note
|
CoarseGrainedSchedulerBackend is only created indirectly through built-in implementations per cluster environment.
|
| Name | Initial Value | Description |
|---|---|---|
spark.rpc.message.maxSize but not greater than |
Maximum RPC message size in MB. When above |
|
empty |
Registry of NOTE: Element added when |
|
|
Total number of CPU cores, i.e. the sum of all the cores on all executors. |
|
|
Total number of registered executors |
|
(uninitialized) |
|
|
The last (highest) identifier of all allocated executors. Used exclusively in |
||
|
||
empty |
Executors marked as removed but the confirmation from a cluster manager has not arrived yet. |
|
empty |
Registry of hostnames and possible number of task running on them. |
|
Current time |
||
|
Number of pending tasks…FIXME |
|
spark.rpc.askTimeout or spark.network.timeout or |
Default timeout for blocking RPC messages (aka ask messages). |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Making Fake Resource Offers on Executors — makeOffers Internal Methods
makeOffers(): Unit
makeOffers(executorId: String): Unit
makeOffers takes the active executors (out of the executorDataMap internal registry) and creates WorkerOffer resource offers for each (one per executor with the executor’s id, host and free cores).
|
Caution
|
Only free cores are considered in making offers. Memory is not! Why?! |
It then requests TaskSchedulerImpl to process the resource offers to create a collection of TaskDescription collections that it in turn uses to launch tasks.
Creating CoarseGrainedSchedulerBackend Instance
CoarseGrainedSchedulerBackend takes the following when created:
CoarseGrainedSchedulerBackend initializes the internal registries and counters.
Getting Executor Ids — getExecutorIds Method
When called, getExecutorIds simply returns executor ids from the internal executorDataMap registry.
|
Note
|
It is called when SparkContext calculates executor ids. |
CoarseGrainedSchedulerBackend Contract
class CoarseGrainedSchedulerBackend {
def minRegisteredRatio: Double
def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint
def reset(): Unit
def sufficientResourcesRegistered(): Boolean
def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean]
def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
}
|
Note
|
CoarseGrainedSchedulerBackend is a private[spark] contract.
|
| Method | Description |
|---|---|
Ratio between Controlled by spark.scheduler.minRegisteredResourcesRatio. |
|
Always positive, i.e. Used when |
numExistingExecutors Method
|
Caution
|
FIXME |
killExecutors Methods
|
Caution
|
FIXME |
applicationId Method
|
Caution
|
FIXME |
getDriverLogUrls Method
|
Caution
|
FIXME |
applicationAttemptId Method
|
Caution
|
FIXME |
Requesting Additional Executors — requestExecutors Method
requestExecutors(numAdditionalExecutors: Int): Boolean
requestExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).
|
Note
|
requestExecutors method is a part of ExecutorAllocationClient Contract that SparkContext uses for requesting additional executors (as a part of a developer API for dynamic allocation of executors).
|
When called, you should see the following INFO message followed by DEBUG message in the logs:
INFO Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
DEBUG Number of pending executors is now [numPendingExecutors]
numPendingExecutors is increased by the input numAdditionalExecutors.
requestExecutors requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal numExistingExecutors and numPendingExecutors decreased by the number of executors pending to be removed.
If numAdditionalExecutors is negative, a IllegalArgumentException is thrown:
Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!
|
Note
|
It is a final method that no other scheduler backends could customize further. |
|
Note
|
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one. |
Requesting Exact Number of Executors — requestTotalExecutors Method
requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean
requestTotalExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).
|
Note
|
requestTotalExecutors is a part of ExecutorAllocationClient Contract that SparkContext uses for requesting the exact number of executors.
|
It sets the internal localityAwareTasks and hostToLocalTaskCount registries. It then calculates the exact number of executors which is the input numExecutors and the executors pending removal decreased by the number of already-assigned executors.
If numExecutors is negative, a IllegalArgumentException is thrown:
Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!
|
Note
|
It is a final method that no other scheduler backends could customize further. |
|
Note
|
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one. |
Computing Default Level of Parallelism — defaultParallelism Method
The default parallelism is controlled by spark.default.parallelism or is at least 2 or totalCoreCount.
|
Note
|
defaultParallelism is part of the SchedulerBackend Contract.
|
Killing Task — killTask Method
killTask simply sends a KillTask message to driverEndpoint.
|
Caution
|
FIXME Image |
|
Note
|
killTask is part of the SchedulerBackend Contract.
|
Stopping All Executors — stopExecutors Method
stopExecutors sends a blocking StopExecutors message to driverEndpoint (if already initialized).
|
Note
|
It is called exclusively while CoarseGrainedSchedulerBackend is being stopped.
|
You should see the following INFO message in the logs:
INFO CoarseGrainedSchedulerBackend: Shutting down all executors
Reset State — reset Method
reset resets the internal state:
-
Sets numPendingExecutors to 0
-
Clears
executorsPendingToRemove -
Sends a blocking RemoveExecutor message to driverEndpoint for every executor (in the internal
executorDataMap) to inform it aboutSlaveLostwith the message:Stale executor after cluster manager re-registered.
reset is a method that is defined in CoarseGrainedSchedulerBackend, but used and overriden exclusively by YarnSchedulerBackend.
Remove Executor — removeExecutor Method
removeExecutor(executorId: String, reason: ExecutorLossReason)
removeExecutor sends a blocking RemoveExecutor message to driverEndpoint.
|
Note
|
It is called by subclasses SparkDeploySchedulerBackend, CoarseMesosSchedulerBackend, and YarnSchedulerBackend. |
CoarseGrainedScheduler RPC Endpoint — driverEndpoint
When CoarseGrainedSchedulerBackend starts, it registers CoarseGrainedScheduler RPC endpoint to be the driver’s communication endpoint.
driverEndpoint is a DriverEndpoint.
|
Note
|
CoarseGrainedSchedulerBackend is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint (at least partially).
|
It is called standalone scheduler’s driver endpoint internally.
It tracks:
It uses driver-revive-thread daemon single-thread thread pool for …FIXME
|
Caution
|
FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.
|
Starting CoarseGrainedSchedulerBackend (and Registering CoarseGrainedScheduler RPC Endpoint) — start Method
start(): Unit
|
Note
|
start is a part of the SchedulerBackend contract.
|
start takes all spark.-prefixed properties and registers the CoarseGrainedScheduler RPC endpoint (backed by DriverEndpoint ThreadSafeRpcEndpoint).
|
Note
|
start uses TaskSchedulerImpl to access the current SparkContext and in turn SparkConf.
|
|
Note
|
start uses RpcEnv that was given when CoarseGrainedSchedulerBackend was created.
|
Checking If Sufficient Compute Resources Available Or Waiting Time Passed — isReady Method
isReady(): Boolean
|
Note
|
isReady is a part of the SchedulerBackend contract.
|
isReady allows to delay task launching until sufficient resources are available or spark.scheduler.maxRegisteredResourcesWaitingTime passes.
Internally, isReady checks whether there are sufficient resources available.
|
Note
|
sufficientResourcesRegistered by default responds that sufficient resources are available. |
If the resources are available, you should see the following INFO message in the logs and isReady is positive.
INFO SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]
|
Note
|
minRegisteredRatio is in the range 0 to 1 (uses spark.scheduler.minRegisteredResourcesRatio) to denote the minimum ratio of registered resources to total expected resources before submitting tasks. |
If there are no sufficient resources available yet (the above requirement does not hold), isReady checks whether the time since startup passed spark.scheduler.maxRegisteredResourcesWaitingTime to give a way to launch tasks (even when minRegisteredRatio not being reached yet).
You should see the following INFO message in the logs and isReady is positive.
INFO SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)
Otherwise, when no sufficient resources are available and spark.scheduler.maxRegisteredResourcesWaitingTime has not elapsed, isReady is negative.
Reviving Resource Offers (by Posting ReviveOffers to CoarseGrainedSchedulerBackend RPC Endpoint) — reviveOffers Method
reviveOffers(): Unit
|
Note
|
reviveOffers is a part of the SchedulerBackend contract.
|
reviveOffers sends a ReviveOffers message to CoarseGrainedSchedulerBackend RPC endpoint.
Stopping CoarseGrainedSchedulerBackend (and Stopping Executors) — stop Method
stop(): Unit
|
Note
|
stop is a part of the SchedulerBackend contract.
|
stop stops all executors and CoarseGrainedScheduler RPC endpoint (by sending a blocking StopDriver message).
In case of any Exception, stop reports a SparkException with the message:
Error stopping standalone scheduler's driver endpoint
Settings
| Spark Property | Default Value | Description |
|---|---|---|
|
Time (in milliseconds) between reviving offers. |
|
|
Maximum message size (in MB) to allow in "control plane" communication; generally only applies to map output size (serialized) information sent between executors and the driver. Increase this if you are running jobs with many thousands of map and reduce tasks and see messages about the RPC message size. |
|
|
Double number between 0 and 1 (including) that controls the minimum ratio of (registered resources / total expected resources) before submitting tasks. See isReady in this document. |
|
|
Time to wait for sufficient resources available. See isReady in this document. |