TaskSchedulerImpl — Default TaskScheduler
TaskSchedulerImpl is the default TaskScheduler.
TaskSchedulerImpl can schedule tasks for multiple types of cluster managers by means of SchedulerBackends.
When a Spark application starts (and so an instance of SparkContext is created) TaskSchedulerImpl with a SchedulerBackend and DAGScheduler are created and soon started.
TaskSchedulerImpl generates tasks for executor resource offers.
TaskSchedulerImpl can track racks per host and port (that however is only used with Hadoop YARN cluster manager).
Using spark.scheduler.mode setting you can select the scheduling policy.
TaskSchedulerImpl submits tasks using SchedulableBuilders.
| Name | Description |
|---|---|
Flag…FIXME Used when…FIXME |
|
Used when…FIXME |
|
Lookup table of hosts per executor. Used when…FIXME |
|
Lookup table of running tasks per executor. Used when…FIXME |
|
Lookup table of the number of running tasks by executor. |
|
Collection of executors per host |
|
Lookup table of executors per hosts in a cluster. Used when…FIXME |
|
Lookup table of hosts per rack. Used when…FIXME |
|
The next task id counting from Used when |
|
Lookup table of TaskSet by stage and attempt ids. |
|
Lookup table of executor by task id. |
|
Registry of active TaskSetManager per task id. |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
getExecutorsAliveOnHost Method
|
Caution
|
FIXME |
createTaskSetManager Method
|
Caution
|
FIXME |
|
Note
|
createTaskSetManager is used exclusively when TaskSchedulerImpl submits tasks (for a given TaskSet).
|
isExecutorAlive Method
|
Caution
|
FIXME |
hasExecutorsAliveOnHost Method
|
Caution
|
FIXME |
hasHostAliveOnRack Method
|
Caution
|
FIXME |
Creating TaskDescriptions For Available Executor Resource Offers (with CPU Cores) — resourceOffers Method
resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]
resourceOffers takes the resources offers (as WorkerOffers) and generates a collection of tasks (as TaskDescription) to launch (given the resources available).
|
Note
|
WorkerOffer represents a resource offer with CPU cores free to use on an executor. |
Internally, resourceOffers first updates hostToExecutors and executorIdToHost lookup tables to record new hosts and executors (given the input offers).
For new executors (not in executorIdToRunningTaskIds) resourceOffers notifies DAGScheduler that an executor was added.
|
Note
|
TaskSchedulerImpl uses resourceOffers to track active executors.
|
|
Caution
|
FIXME a picture with executorAdded call from TaskSchedulerImpl to DAGScheduler.
|
resourceOffers requests BlacklistTracker to applyBlacklistTimeout and filters out offers on blacklisted nodes and executors.
|
Note
|
resourceOffers uses the optional BlacklistTracker that was given when TaskSchedulerImpl was created.
|
|
Caution
|
FIXME Expand on blacklisting |
resourceOffers then randomly shuffles offers (to evenly distribute tasks across executors and avoid over-utilizing some executors) and initializes the local data structures tasks and availableCpus (as shown in the figure below).
resourceOffers takes TaskSets in scheduling order from top-level Schedulable Pool.
|
Note
|
TaskSetManager manages execution of the tasks in a single TaskSet that represents a single Stage. |
For every TaskSetManager (in scheduling order), you should see the following DEBUG message in the logs:
DEBUG TaskSchedulerImpl: parentName: [name], name: [name], runningTasks: [count]
Only if a new executor was added, resourceOffers notifies every TaskSetManager about the change (to recompute locality preferences).
resourceOffers then takes every TaskSetManager (in scheduling order) and offers them each node in increasing order of locality levels (per TaskSetManager’s valid locality levels).
|
Note
|
A TaskSetManager computes locality levels of the tasks it manages.
|
For every TaskSetManager and the TaskSetManager's valid locality level, resourceOffers resourceOfferSingleTaskSet as long as the TaskSetManager manages to launch a task (given the locality level).
If resourceOffers did not manage to offer resources to a TaskSetManager so it could launch any task, resourceOffers requests the TaskSetManager to abort the TaskSet if completely blacklisted.
When resourceOffers managed to launch a task, the internal hasLaunchedTask flag gets enabled (that effectively means what the name says "there were executors and I managed to launch a task").
|
Note
|
|
TaskLocality — Task Locality Preference
TaskLocality represents a task locality preference and can be one of the following (from most localized to the widest):
-
PROCESS_LOCAL -
NODE_LOCAL -
NO_PREF -
RACK_LOCAL -
ANY
WorkerOffer — Free CPU Cores on Executor
WorkerOffer(executorId: String, host: String, cores: Int)
WorkerOffer represents a resource offer with free CPU cores available on an executor (by executorId) on a host.
executorLost Method
|
Caution
|
FIXME |
mapOutputTracker
|
Caution
|
FIXME |
starvationTimer
|
Caution
|
FIXME |
executorHeartbeatReceived Method
executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId): Boolean
executorHeartbeatReceived is…
|
Caution
|
FIXME |
|
Note
|
executorHeartbeatReceived is a part of the TaskScheduler Contract.
|
Cancelling Tasks for Stage — cancelTasks Method
cancelTasks(stageId: Int, interruptThread: Boolean): Unit
cancelTasks cancels all tasks submitted for execution in a stage stageId.
|
Note
|
It is currently called by DAGScheduler when it cancels a stage.
|
handleSuccessfulTask Method
handleSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]): Unit
handleSuccessfulTask simply forwards the call to the input taskSetManager (passing tid and taskResult).
|
Note
|
handleSuccessfulTask is called when TaskSchedulerGetter has managed to deserialize the task result of a task that finished successfully.
|
handleTaskGettingResult Method
handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit
handleTaskGettingResult simply forwards the call to the taskSetManager.
|
Note
|
handleTaskGettingResult is used to inform that TaskResultGetter enqueues a successful task with IndirectTaskResult task result (and so is about to fetch a remote block from a BlockManager).
|
schedulableBuilder Attribute
schedulableBuilder is a SchedulableBuilder for the TaskSchedulerImpl.
It is set up when a TaskSchedulerImpl is initialized and can be one of two available builders:
-
FIFOSchedulableBuilder when scheduling policy is FIFO (which is the default scheduling policy).
-
FairSchedulableBuilder for FAIR scheduling policy.
|
Note
|
Use spark.scheduler.mode setting to select the scheduling policy. |
Tracking Racks per Hosts and Ports — getRackForHost Method
getRackForHost(value: String): Option[String]
getRackForHost is a method to know about the racks per hosts and ports. By default, it assumes that racks are unknown (i.e. the method returns None).
|
Note
|
It is overriden by the YARN-specific TaskScheduler YarnScheduler. |
getRackForHost is currently used in two places:
-
TaskSchedulerImpl.resourceOffers to track hosts per rack (using the internal
hostsByRackregistry) while processing resource offers. -
TaskSetManager.addPendingTask, TaskSetManager.dequeueTask, and TaskSetManager.dequeueSpeculativeTask
Creating TaskSchedulerImpl Instance
TaskSchedulerImpl takes the following when created:
TaskSchedulerImpl initializes the internal registries and counters.
|
Note
|
There is another TaskSchedulerImpl constructor that requires a SparkContext object only and sets maxTaskFailures to spark.task.maxFailures or, if not set, defaults to 4.
|
TaskSchedulerImpl sets schedulingMode to the value of spark.scheduler.mode setting (defaults to FIFO).
|
Note
|
schedulingMode is part of TaskScheduler Contract.
|
Failure to set schedulingMode results in a SparkException:
Unrecognized spark.scheduler.mode: [schedulingModeConf]
Ultimately, TaskSchedulerImpl creates a TaskResultGetter.
Initializing TaskSchedulerImpl — initialize Method
initialize(backend: SchedulerBackend): Unit
initialize initializes a TaskSchedulerImpl object.
|
Note
|
initialize is called while SparkContext is being created and creates SchedulerBackend and TaskScheduler.
|
initialize saves the reference to the current SchedulerBackend (as backend) and sets rootPool to be an empty-named Pool with already-initialized schedulingMode (while creating a TaskSchedulerImpl object), initMinShare and initWeight as 0.
|
Note
|
schedulingMode and rootPool are a part of TaskScheduler Contract.
|
It then creates the internal SchedulableBuilder object (as schedulableBuilder) based on schedulingMode:
-
FIFOSchedulableBuilder for
FIFOscheduling mode -
FairSchedulableBuilder for
FAIRscheduling mode
With the schedulableBuilder object created, initialize requests it to build pools.
|
Caution
|
FIXME Why are rootPool and schedulableBuilder created only now? What do they need that it is not available when TaskSchedulerImpl is created?
|
Starting TaskSchedulerImpl — start Method
As part of initialization of a SparkContext, TaskSchedulerImpl is started (using start from the TaskScheduler Contract).
start(): Unit
start starts the scheduler backend.
TaskSchedulerImpl in Spark Standalonestart also starts task-scheduler-speculation executor service.
task-scheduler-speculation Scheduled Executor Service — speculationScheduler Internal Attribute
speculationScheduler is a java.util.concurrent.ScheduledExecutorService with the name task-scheduler-speculation for speculative execution of tasks.
When TaskSchedulerImpl starts (in non-local run mode) with spark.speculation enabled, speculationScheduler is used to schedule checkSpeculatableTasks to execute periodically every spark.speculation.interval after the initial spark.speculation.interval passes.
speculationScheduler is shut down when TaskSchedulerImpl stops.
Checking for Speculatable Tasks — checkSpeculatableTasks Method
checkSpeculatableTasks(): Unit
checkSpeculatableTasks requests rootPool to check for speculatable tasks (if they ran for more than 100 ms) and, if there any, requests SchedulerBackend to revive offers.
|
Note
|
checkSpeculatableTasks is executed periodically as part of speculative execution of tasks.
|
Acceptable Number of Task Failures — maxTaskFailures Attribute
The acceptable number of task failures (maxTaskFailures) can be explicitly defined when creating TaskSchedulerImpl instance or based on spark.task.maxFailures setting that defaults to 4 failures.
|
Note
|
It is exclusively used when submitting tasks through TaskSetManager. |
Cleaning up After Removing Executor — removeExecutor Internal Method
removeExecutor(executorId: String, reason: ExecutorLossReason): Unit
removeExecutor removes the executorId executor from the following internal registries: executorIdToTaskCount, executorIdToHost, executorsByHost, and hostsByRack. If the affected hosts and racks are the last entries in executorsByHost and hostsByRack, appropriately, they are removed from the registries.
Unless reason is LossReasonPending, the executor is removed from executorIdToHost registry and TaskSetManagers get notified.
|
Note
|
The internal removeExecutor is called as part of statusUpdate and executorLost.
|
Intercepting Nearly-Completed SparkContext Initialization — postStartHook Callback
postStartHook is a custom implementation of postStartHook from the TaskScheduler Contract that waits until a scheduler backend is ready (using the internal blocking waitBackendReady).
|
Note
|
postStartHook is used when SparkContext is created (before it is fully created) and YarnClusterScheduler.postStartHook.
|
Stopping TaskSchedulerImpl — stop Method
stop(): Unit
stop() stops all the internal services, i.e. task-scheduler-speculation executor service, SchedulerBackend, TaskResultGetter, and starvationTimer timer.
Calculating Default Level of Parallelism — defaultParallelism Method
Default level of parallelism is a hint for sizing jobs. It is a part of the TaskScheduler contract and used by SparkContext to create RDDs with the right number of partitions when not specified explicitly.
TaskSchedulerImpl uses SchedulerBackend.defaultParallelism() to calculate the value, i.e. it just passes it along to a scheduler backend.
Submitting Tasks (from TaskSet) — submitTasks Method
|
Note
|
submitTasks is a part of TaskScheduler Contract.
|
submitTasks(taskSet: TaskSet): Unit
submitTasks creates a TaskSetManager for the input TaskSet and adds it to the Schedulable root pool.
|
Note
|
The root pool can be a single flat linked queue (in FIFO scheduling mode) or a hierarchy of pools of Schedulables (in FAIR scheduling mode).
|
It makes sure that the requested resources, i.e. CPU and memory, are assigned to the Spark application for a non-local environment before requesting the current SchedulerBackend to revive offers.
|
Note
|
If there are tasks to launch for missing partitions in a stage, DAGScheduler executes submitTasks (see submitMissingTasks for Stage and Job).
|
When submitTasks is called, you should see the following INFO message in the logs:
INFO TaskSchedulerImpl: Adding task set [id] with [length] tasks
It creates a new TaskSetManager for the input taskSet and the acceptable number of task failures.
|
Note
|
The acceptable number of task failures is specified when a TaskSchedulerImpl is created. |
|
Note
|
A TaskSet knows the tasks to execute (as tasks) and stage id (as stageId) the tasks belong to. Read TaskSets.
|
The TaskSet is registered in the internal taskSetsByStageIdAndAttempt registry with the TaskSetManager.
If there is more than one active TaskSetManager for the stage, a IllegalStateException is thrown with the message:
more than one active taskSet for stage [stage]: [TaskSet ids]
|
Note
|
TaskSetManager is considered active when it is not a zombie.
|
The TaskSetManager is added to the Schedulable pool (via SchedulableBuilder).
When the method is called the very first time (hasReceivedTask is false) in cluster mode only (i.e. isLocal of the TaskSchedulerImpl is false), starvationTimer is scheduled to execute after spark.starvation.timeout to ensure that the requested resources, i.e. CPUs and memory, were assigned by a cluster manager.
|
Note
|
After the first spark.starvation.timeout passes, the internal hasReceivedTask flag becomes true.
|
Every time the starvation timer thread is executed and hasLaunchedTask flag is false, the following WARN message is printed out to the logs:
WARN Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
Otherwise, when the hasLaunchedTask flag is true the timer thread cancels itself.
Ultimately, submitTasks requests the SchedulerBackend to revive offers.
|
Tip
|
Use dag-scheduler-event-loop thread to step through the code in a debugger.
|
resourceOfferSingleTaskSet Method
resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]): Boolean
resourceOfferSingleTaskSet is a private helper method that is executed when…
statusUpdate Method
statusUpdate(
tid: Long,
state: TaskState.TaskState,
serializedData: ByteBuffer): Unit
statusUpdate removes a lost executor when a tid task has failed. For all task states, statusUpdate removes the tid task from the internal registries, i.e. taskIdToTaskSetManager and taskIdToExecutorId, and decrements the number of running tasks in executorIdToTaskCount registry. For tid in FINISHED, FAILED, KILLED or LOST states, statusUpdate informs the TaskSetManager that the task can be removed from the running tasks. For tid in FINISHED state statusUpdate schedules an asynchrounous task to deserialize the task result (and notify TaskSchedulerImpl) while for FAILED, KILLED or LOST states it calls TaskResultGetter.enqueueFailedTask. Ultimately, given an executor that has been lost, statusUpdate informs informs DAGScheduler that the executor was lost and SchedulerBackend is requested to revive offers.
For tid task in LOST state and an executor still assigned for the task and tracked in executorIdToTaskCount registry, the executor is removed (with reason Task [tid] was lost, so marking the executor as lost as well.).
|
Caution
|
FIXME Why is SchedulerBackend.reviveOffers() called only for lost executors? |
statusUpdate looks up the TaskSetManager for tid (in taskIdToTaskSetManager registry).
When the TaskSetManager is found and the task is in a finished state, the task is removed from the internal registries, i.e. taskIdToTaskSetManager and taskIdToExecutorId, and the number of currently running tasks for the executor is decremented (in executorIdToTaskCount registry).
For a task in FINISHED state, the task is removed from the running tasks and an asynchrounous task is scheduled to deserialize the task result (and notify TaskSchedulerImpl).
For a task in FAILED, KILLED, or LOST state, the task is removed from the running tasks (as for the FINISHED state) and then TaskResultGetter.enqueueFailedTask is called.
If the TaskSetManager for tid could not be found (in taskIdToTaskSetManager registry), you should see the following ERROR message in the logs:
ERROR Ignoring update with state [state] for TID [tid] because its task set is gone (this is likely the result of receiving duplicate task finished status updates)
Any exception is caught and reported as ERROR message in the logs:
ERROR Exception in statusUpdate
Ultimately, for tid task with an executor marked as lost, statusUpdate informs DAGScheduler that the executor was lost (with SlaveLost and the reason Task [tid] was lost, so marking the executor as lost as well.) and SchedulerBackend is requested to revive offers.
|
Caution
|
FIXME image with scheduler backends calling TaskSchedulerImpl.statusUpdate.
|
|
Note
|
statusUpdate is used when CoarseGrainedSchedulerBackend, LocalSchedulerBackend and MesosFineGrainedSchedulerBackend inform about changes in task states.
|
Notifying TaskSetManager that Task Failed — handleFailedTask Method
handleFailedTask(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState,
reason: TaskFailedReason): Unit
handleFailedTask notifies taskSetManager that tid task has failed and, only when taskSetManager is not in zombie state and tid is not in KILLED state, requests SchedulerBackend to revive offers.
|
Note
|
handleFailedTask is called when TaskResultGetter deserializes a TaskFailedReason for a failed task.
|
taskSetFinished Method
taskSetFinished(manager: TaskSetManager): Unit
taskSetFinished looks all TaskSets up by the stage id (in taskSetsByStageIdAndAttempt registry) and removes the stage attempt from them, possibly with removing the entire stage record from taskSetsByStageIdAndAttempt registry completely (if there are no other attempts registered).
|
Note
|
A TaskSetManager manages a TaskSet for a stage.
|
taskSetFinished then removes manager from the parent’s schedulable pool.
You should see the following INFO message in the logs:
INFO Removed TaskSet [id], whose tasks have all completed, from pool [name]
|
Note
|
taskSetFinished method is called when TaskSetManager has received the results of all the tasks in a TaskSet.
|
Notifying DAGScheduler About New Executor — executorAdded Method
executorAdded(execId: String, host: String)
executorAdded just notifies DAGScheduler that an executor was added.
|
Caution
|
FIXME Image with a call from TaskSchedulerImpl to DAGScheduler, please. |
|
Note
|
executorAdded uses DAGScheduler that was given when setDAGScheduler.
|
Waiting Until SchedulerBackend is Ready — waitBackendReady Internal Method
waitBackendReady(): Unit
waitBackendReady waits until a SchedulerBackend is ready.
|
Note
|
SchedulerBackend is ready by default.
|
waitBackendReady keeps checking the status every 100 milliseconds until SchedulerBackend is ready or the SparkContext is stopped.
If the SparkContext happens to be stopped while waiting, waitBackendReady reports a IllegalStateException:
Spark context stopped while waiting for backend
|
Note
|
waitBackendReady is used when TaskSchedulerImpl is notified that SparkContext is near to get fully initialized.
|
Settings
| Spark Property | Default Value | Description |
|---|---|---|
|
The number of individual task failures before giving up on the entire TaskSet and the job afterwards. |
|
|
The number of CPU cores per task. |
|
|
Threshold above which Spark warns a user that an initial TaskSet may be starved. |
|
|
A case-insensitive name of the scheduling mode — NOTE: Only |