Query Execution

QueryExecution is an integral part of a Dataset and represents the query execution that will eventually "produce" the data in a Dataset (when toRdd is called and a RDD[InternalRow] is computed).

Note
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.

You can access the QueryExecution of a Dataset using queryExecution attribute.

val ds: Dataset[Long] = ...
val queryExec = ds.queryExecution

QueryExecution is the result of executing a LogicalPlan in a SparkSession (and so you could create a Dataset from a logical operator or use the QueryExecution after executing a logical operator).

Table 1. QueryExecution Lazy Attributes
Attribute Description

analyzed

Result of applying the Analyzer's rules to the LogicalPlan (of the QueryExecution).

withCachedData

LogicalPlan that is the analyzed plan after being analyzed, checked (for unsupported operations) and replaced with cached segments.

optimizedPlan

LogicalPlan (of a structured query) being the result of executing the session-owned Catalyst Query Optimizer to withCachedData.

sparkPlan

SparkPlan that is the result of requesting SparkPlanner to plan a optimized logical query plan.

NOTE: In fact, the result SparkPlan is the first Spark query plan from the collection of possible query plans from SparkPlanner.

executedPlan

SparkPlan ready for execution. It is the sparkPlan plan with all the preparation rules applied.

toRdd

RDD[InternalRow] that is the result of "executing" a physical plan, i.e. executedPlan.execute().

TIP: InternalRow is the internal optimized binary row format.

You can access the lazy attributes as follows:

val dataset: Dataset[Long] = ...
dataset.queryExecution.executedPlan

QueryExecution uses the input SparkSession to access the current SparkPlanner (through SessionState that could also return a HiveSessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan exactly) using the planner. It is available as the sparkPlan attribute.

A streaming variant of QueryExecution is IncrementalExecution.

Tip
Use explain operator to know about the logical and physical plans of a Dataset.
val ds = spark.range(5)
scala> ds.queryExecution
res17: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Analyzed Logical Plan ==
id: bigint
Range 0, 5, 1, 8, [id#39L]

== Optimized Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Physical Plan ==
WholeStageCodegen
:  +- Range 0, 1, 8, 5, [id#39L]
Caution
FIXME What’s planner? analyzed? Why do we need assertSupported?

QueryExecution belongs to org.apache.spark.sql.execution package.

hiveResultString Method

hiveResultString(): Seq[String]

hiveResultString returns the result as a Hive-compatible sequence of strings.

scala> spark.range(5).queryExecution.hiveResultString
res0: Seq[String] = ArrayBuffer(0, 1, 2, 3, 4)

scala> spark.read.csv("people.csv").queryExecution.hiveResultString
res4: Seq[String] = ArrayBuffer(id	name	age, 0	Jacek	42)
Caution
FIXME

Internally, hiveResultString does..

Note
hiveResultString is executed when…​

Creating QueryExecution Instance

class QueryExecution(
  val sparkSession: SparkSession,
  val logical: LogicalPlan)

QueryExecution requires a SparkSession and a LogicalPlan.

Accessing SparkPlanner — planner Method

planner: SparkPlanner

planner returns the current SparkPlanner.

planner is merely to expose internal planner (in the current SessionState).

preparations — SparkPlan Preparation Rules (to apply before Query Execution)

preparations is a sequence of SparkPlan optimization rules.

Tip
A SparkPlan optimization rule transforms a physical operator (aka SparkPlan) to another (possibly more efficient) SparkPlan.

preparations collection is an intermediate phase of query execution that developers can used to introduce further optimizations.

The current list of SparkPlan transformations in preparations is as follows:

  1. ExtractPythonUDFs

  2. PlanSubqueries

  3. EnsureRequirements

  4. CollapseCodegenStages

  5. ReuseExchange

  6. ReuseSubquery

Note
The transformation rules are applied sequentially in order to the physical plan before execution, i.e. they generate a SparkPlan when executedPlan lazy value is accessed.

IncrementalExecution

IncrementalExecution is a custom QueryExecution with OutputMode, checkpointLocation, and currentBatchId.

It lives in org.apache.spark.sql.execution.streaming package.

Caution
FIXME What is stateStrategy?

Stateful operators in the query plan are numbered using operatorId that starts with 0.

IncrementalExecution adds one Rule[SparkPlan] called state to preparations sequence of rules as the first element.

Caution
FIXME What does IncrementalExecution do? Where is it used?

results matching ""

    No results matching ""