launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit
DriverEndpoint — CoarseGrainedSchedulerBackend RPC Endpoint
DriverEndpoint
is a ThreadSafeRpcEndpoint.
Name | Initial Value | Description |
---|---|---|
Executor addresses (host and port) for executors. Set when an executor connects to register itself. See RegisterExecutor RPC message. |
Launching Tasks on Executors — launchTasks
Method
launchTasks
takes one task (as TaskDescription) at a time (from the input tasks
collection).
launchTasks
serializes TaskDescription
and checks its size.
Caution
|
FIXME At that point, tasks have their executor assigned. When and how did that happen? |
If the size of the serialized task is below the maximum RPC message size, launchTasks
decrements spark.task.cpus number of cores for the executor that has been assigned to execute the task (and tracked in executorDataMap internal registry).
Note
|
ExecutorData tracks the number of free cores of an executor (as freeCores ).
|
You should see the following DEBUG message in the logs:
DEBUG DriverEndpoint: Launching task [taskId] on executor id: [executorId] hostname: [executorHost].
In the end, launchTasks
notifies the associated executor to launch the task (by sending a LaunchTask message to the executor’s RPC endpoint with the serialized task insize SerializableBuffer
).
Note
|
ExecutorData holds the RpcEndpointRef of an executor to send task launch requests to (as executorEndpoint ).
|
In case the size of a serialized TaskDescription equals or exceeds the maximum RPC message size, launchTasks
finds all the TaskSetManagers associated with the TaskDescription
and aborts them with the following message:
Serialized task [taskId]:[index] was [limit] bytes, which exceeds max allowed: spark.rpc.message.maxSize ([maxRpcMessageSize] bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
Note
|
launchTasks uses the registry of active TaskSetManagers per task id from TaskSchedulerImpl that was given when CoarseGrainedSchedulerBackend was created.
|
Note
|
Scheduling in Spark relies on cores only (not memory), i.e. the number of tasks Spark can run on an executor is limited by the number of cores available only. When submitting a Spark application for execution both resources — memory and cores — can be specified explicitly. |
Note
|
launchTasks is used when CoarseGrainedSchedulerBackend makes fake resource offers on executors.
|
executorIsAlive
Internal Method
Caution
|
FIXME |
onStop
Callback
Caution
|
FIXME |
onDisconnected Callback
When called, onDisconnected
removes the worker from the internal addressToExecutorId registry (that effectively removes the worker from a cluster).
While removing, it calls removeExecutor with the reason being SlaveLost
and message:
Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Note
|
onDisconnected is called when a remote host is lost.
|
StatusUpdate
StatusUpdate(
executorId: String,
taskId: Long,
state: TaskState,
data: SerializableBuffer)
extends CoarseGrainedClusterMessage
Caution
|
FIXME |
StopExecutors
StopExecutors
message is receive-reply and blocking. When received, the following INFO message appears in the logs:
INFO Asking each executor to shut down
It then sends a StopExecutor message to every registered executor (from executorDataMap
).
RegisterExecutor
RegisterExecutor(
executorId: String,
executorRef: RpcEndpointRef,
hostname: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage
Note
|
RegisterExecutor is sent when CoarseGrainedExecutorBackend (RPC Endpoint) starts accepting messages.
|

Only one executor can register under executorId
.
INFO Registered executor [executorRef] ([executorAddress]) with ID [executorId]
It does internal bookkeeping like updating addressToExecutorId
, totalCoreCount
, and totalRegisteredExecutors
, executorDataMap
.
When numPendingExecutors
is more than 0
, the following is printed out to the logs:
DEBUG Decremented number of pending executors ([numPendingExecutors] left)
CoarseGrainedSchedulerBackend
sends RegisteredExecutor message back (that confirms the executor’s registration).
Note
|
The executor’s RpcEndpointRef is specified as part of RegisterExecutor .
|
It then announces the new executor by posting SparkListenerExecutorAdded to LiveListenerBus.
Ultimately, makeOffers is called.
Scheduling Sending ReviveOffers Periodically — onStart
Callback
onStart(): Unit
Note
|
onStart is a part of RpcEndpoint contract that is executed before a RPC endpoint starts accepting messages.
|
onStart
schedules a periodic action to send ReviveOffers immediately every spark.scheduler.revive.interval.
Note
|
spark.scheduler.revive.interval defaults to 1s .
|
Making Resource Offers — ReviveOffers
Message
case object ReviveOffers extends CoarseGrainedClusterMessage
When received, ReviveOffers
simply makes executor resource offers (in a form of free cores and host name of active executors).
Note
|
Active executors are executors not registered in executorsPendingToRemove and executorsPendingLossReason internal registries. |
Note
|
ReviveOffers is sent periodically soon after CoarseGrainedSchedulerBackend RPC endpoint starts accepting messages and when CoarseGrainedExecutorBackend revives resource offers.
|
Making Executor Resource Offers (for Launching Tasks) — makeOffers
Internal Method
makeOffers(): Unit
makeOffers
first creates WorkerOffers
for all active executors (registered in the internal executorDataMap cache).
Note
|
WorkerOffer represents a resource offer with CPU cores available on an executor.
|
makeOffers
then requests TaskSchedulerImpl
to generate tasks for the available WorkerOffers
followed by launching the tasks on respective executors.
Note
|
makeOffers uses TaskSchedulerImpl that was given when CoarseGrainedSchedulerBackend was created.
|
Note
|
Tasks are described using TaskDescription that holds…FIXME |
Note
|
makeOffers is used when CoarseGrainedSchedulerBackend RPC endpoint handles ReviveOffers or RegisterExecutor messages.
|