Task

In Spark, a task (aka command) is the smallest individual unit of execution that can compute a RDD partition.

spark rdd partitions job stage tasks.png
Figure 1. Tasks correspond to partitions in RDD

A task is described by the Task contract with a single runTask method.

There are two concrete implementations of Task contract:

  • ShuffleMapTask that executes a task and divides the task’s output to multiple buckets (based on the task’s partitioner).

  • ResultTask that executes a task and sends the task’s output back to the driver application.

The very last stage in a Spark job consists of multiple ResultTasks, while earlier stages can only be ShuffleMapTasks.

Caution
FIXME You could have a Spark job with ShuffleMapTask being the last.

In other (more technical) words, a task is a computation on a data partition in a stage of a RDD in a Spark job.

Note
T is the type defined when a Task is created.
Table 1. Task Internal Registries and Counters
Name Description

metrics

Used when ???

taskMemoryManager

TaskMemoryManager that manages the memory allocated by the task.

Used when ???

context

Used when ???

_killed

Used when ???

_executorDeserializeTime

Used when ???

_executorDeserializeCpuTime

Used when ???

taskThread

Used when ???

epoch

Set for a Task when TaskSetManager is created and later used when TaskRunner runs and when DAGScheduler handles a ShuffleMapTask successful completion.

A task can only belong to one stage and operate on a single partition. All tasks in a stage must be completed before the stages that follow can start.

Tasks are spawned one by one for each stage and partition.

Caution
FIXME What are stageAttemptId and taskAttemptId?

Task Contract

def runTask(context: TaskContext): T
def preferredLocations: Seq[TaskLocation] = Nil
Note
Task is a private[spark] contract.
Table 2. Task Contract
Method Description

runTask

Used when a task runs.

preferredLocations

Used when…​FIXME

Creating Task Instance

Task[T](
  val stageId: Int,
  val stageAttemptId: Int,
  val partitionId: Int,
  var localProperties: Properties = new Properties,
  serializedTaskMetrics: Array[Byte] =
    SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
  val jobId: Option[Int] = None,
  val appId: Option[String] = None,
  val appAttemptId: Option[String] = None)
extends Serializable

Task Attributes

A Task instance is uniquely identified by the following task attributes:

  • stageId - there can be many stages in a job. Every stage has its own unique stageId that the task belongs to.

  • stageAttemptId - a stage can be re-attempted for execution in case of failure. stageAttemptId represents the attempt id of a stage that the task belongs to.

  • partitionId - a task is a unit of work on a partitioned distributed dataset. Every partition has its own unique partitionId that a task processes.

  • metrics - an instance of TaskMetrics for the task.

  • localProperties - local private properties of the task.

Running Task Thread — run Method

run(
  taskAttemptId: Long,
  attemptNumber: Int,
  metricsSystem: MetricsSystem): T

run creates a TaskContextImpl that in turn becomes the task’s TaskContext.

Note
run is a final method and so must not be overriden.

run checks _killed flag and, if enabled, kills the task (with interruptThread flag disabled).

run creates a Hadoop CallerContext and sets it.

Note
This is the moment when the custom Task's runTask is executed.

In the end, run notifies TaskContextImpl that the task has completed (regardless of the final outcome — a success or a failure).

In case of any exceptions, run notifies TaskContextImpl that the task has failed. run requests MemoryStore to release unroll memory for this task (for both ON_HEAP and OFF_HEAP memory modes).

Note
run uses SparkEnv to access BlockManager that it uses to access MemoryStore.
Note
run is used exclusively when TaskRunner starts. The Task instance has just been deserialized from taskBytes that were sent over the wire to an executor. localProperties and TaskMemoryManager are already assigned.

Task States

A task can be in one of the following states:

  • LAUNCHING

  • RUNNING when the task is being started.

  • FINISHED when the task finished with the serialized result.

  • FAILED when the task fails, e.g. when FetchFailedException, CommitDeniedException or any Throwable occurs

  • KILLED when an executor kills a task.

  • LOST

States are the values of org.apache.spark.TaskState.

Note
Task status updates are sent from executors to the driver through ExecutorBackend.

Task is finished when it is in one of FINISHED, FAILED, KILLED, LOST

LOST and FAILED states are considered failures.

Tip
Task states correspond to org.apache.mesos.Protos.TaskState.

Collect Latest Values of Accumulators — collectAccumulatorUpdates Method

collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulableInfo]

collectAccumulatorUpdates collects the latest values of accumulators used in a task (and returns the values as a collection of AccumulableInfo).

Note
collectAccumulatorUpdates is used when TaskRunner runs a task (and sends a task’s final results).

When taskFailed is true it filters out accumulators with countFailedValues disabled.

Caution
FIXME Why is the check context != null?
Note
It uses context.taskMetrics.accumulatorUpdates().
Caution
FIXME What is context.taskMetrics.accumulatorUpdates() doing?

Killing Task — kill Method

kill(interruptThread: Boolean)

kill marks the task to be killed, i.e. it sets the internal _killed flag to true.

kill calls TaskContextImpl.markInterrupted when context is set.

If interruptThread is enabled and the internal taskThread is available, kill interrupts it.

Caution
FIXME When could context and interruptThread not be set?

results matching ""

    No results matching ""