DriverEndpoint — CoarseGrainedSchedulerBackend RPC Endpoint

DriverEndpoint is a ThreadSafeRpcEndpoint.

Table 1. DriverEndpoint’s Internal Properties
Name Initial Value Description

executorsPendingLossReason

addressToExecutorId

Executor addresses (host and port) for executors.

Set when an executor connects to register itself. See RegisterExecutor RPC message.

Launching Tasks on Executors — launchTasks Method

launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit

launchTasks takes one task (as TaskDescription) at a time (from the input tasks collection).

launchTasks serializes TaskDescription and checks its size.

Caution
FIXME At that point, tasks have their executor assigned. When and how did that happen?

If the size of the serialized task is below the maximum RPC message size, launchTasks decrements spark.task.cpus number of cores for the executor that has been assigned to execute the task (and tracked in executorDataMap internal registry).

Note
ExecutorData tracks the number of free cores of an executor (as freeCores).

You should see the following DEBUG message in the logs:

DEBUG DriverEndpoint: Launching task [taskId] on executor id: [executorId] hostname: [executorHost].

In the end, launchTasks notifies the associated executor to launch the task (by sending a LaunchTask message to the executor’s RPC endpoint with the serialized task insize SerializableBuffer).

Note
ExecutorData holds the RpcEndpointRef of an executor to send task launch requests to (as executorEndpoint).

In case the size of a serialized TaskDescription equals or exceeds the maximum RPC message size, launchTasks finds all the TaskSetManagers associated with the TaskDescription and aborts them with the following message:

Serialized task [taskId]:[index] was [limit] bytes, which exceeds max allowed: spark.rpc.message.maxSize ([maxRpcMessageSize] bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
Note
Scheduling in Spark relies on cores only (not memory), i.e. the number of tasks Spark can run on an executor is limited by the number of cores available only. When submitting a Spark application for execution both resources — memory and cores — can be specified explicitly.
Note
launchTasks is used when CoarseGrainedSchedulerBackend makes fake resource offers on executors.

executorIsAlive Internal Method

Caution
FIXME

onStop Callback

Caution
FIXME

onDisconnected Callback

When called, onDisconnected removes the worker from the internal addressToExecutorId registry (that effectively removes the worker from a cluster).

While removing, it calls removeExecutor with the reason being SlaveLost and message:

Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Note
onDisconnected is called when a remote host is lost.

KillTask

RemoveExecutor

RetrieveSparkProps

StatusUpdate

StatusUpdate(
  executorId: String,
  taskId: Long,
  state: TaskState,
  data: SerializableBuffer)
extends CoarseGrainedClusterMessage
Caution
FIXME

StopDriver

StopDriver message stops the RPC endpoint.

StopExecutors

StopExecutors message is receive-reply and blocking. When received, the following INFO message appears in the logs:

INFO Asking each executor to shut down

It then sends a StopExecutor message to every registered executor (from executorDataMap).

RegisterExecutor

RegisterExecutor(
  executorId: String,
  executorRef: RpcEndpointRef,
  hostname: String,
  cores: Int,
  logUrls: Map[String, String])
extends CoarseGrainedClusterMessage
CoarseGrainedSchedulerBackend RegisterExecutor event.png
Figure 1. Executor registration (RegisterExecutor RPC message flow)

Only one executor can register under executorId.

INFO Registered executor [executorRef] ([executorAddress]) with ID [executorId]

It does internal bookkeeping like updating addressToExecutorId, totalCoreCount, and totalRegisteredExecutors, executorDataMap.

When numPendingExecutors is more than 0, the following is printed out to the logs:

DEBUG Decremented number of pending executors ([numPendingExecutors] left)

CoarseGrainedSchedulerBackend sends RegisteredExecutor message back (that confirms the executor’s registration).

Note
The executor’s RpcEndpointRef is specified as part of RegisterExecutor.

It then announces the new executor by posting SparkListenerExecutorAdded to LiveListenerBus.

Ultimately, makeOffers is called.

Scheduling Sending ReviveOffers Periodically — onStart Callback

onStart(): Unit
Note
onStart is a part of RpcEndpoint contract that is executed before a RPC endpoint starts accepting messages.

onStart schedules a periodic action to send ReviveOffers immediately every spark.scheduler.revive.interval.

Note
spark.scheduler.revive.interval defaults to 1s.

Making Resource Offers — ReviveOffers Message

case object ReviveOffers extends CoarseGrainedClusterMessage

When received, ReviveOffers simply makes executor resource offers (in a form of free cores and host name of active executors).

Note
Active executors are executors not registered in executorsPendingToRemove and executorsPendingLossReason internal registries.

Making Executor Resource Offers (for Launching Tasks) — makeOffers Internal Method

makeOffers(): Unit

makeOffers first creates WorkerOffers for all active executors (registered in the internal executorDataMap cache).

Note
WorkerOffer represents a resource offer with CPU cores available on an executor.
Note
makeOffers uses TaskSchedulerImpl that was given when CoarseGrainedSchedulerBackend was created.
Note
Tasks are described using TaskDescription that holds…​FIXME
Note
makeOffers is used when CoarseGrainedSchedulerBackend RPC endpoint handles ReviveOffers or RegisterExecutor messages.

results matching ""

    No results matching ""