val ds: Dataset[Long] = ...
val queryExec = ds.queryExecution
Query Execution
QueryExecution is an integral part of a Dataset and represents the query execution that will eventually "produce" the data in a Dataset (when toRdd is called and a RDD[InternalRow] is computed).
|
Note
|
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.
|
You can access the QueryExecution of a Dataset using queryExecution attribute.
QueryExecution is the result of executing a LogicalPlan in a SparkSession (and so you could create a Dataset from a logical operator or use the QueryExecution after executing a logical operator).
| Attribute | Description |
|---|---|
Result of applying the Analyzer's rules to the LogicalPlan (of the |
|
|
|
|
|
SparkPlan that is the result of requesting SparkPlanner to plan a optimized logical query plan. NOTE: In fact, the result |
|
SparkPlan ready for execution. It is the sparkPlan plan with all the preparation rules applied. |
|
TIP: InternalRow is the internal optimized binary row format. |
You can access the lazy attributes as follows:
val dataset: Dataset[Long] = ...
dataset.queryExecution.executedPlan
QueryExecution uses the input SparkSession to access the current SparkPlanner (through SessionState that could also return a HiveSessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan exactly) using the planner. It is available as the sparkPlan attribute.
A streaming variant of QueryExecution is IncrementalExecution.
|
Tip
|
Use explain operator to know about the logical and physical plans of a Dataset.
|
val ds = spark.range(5)
scala> ds.queryExecution
res17: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
Range 0, 5, 1, 8, [id#39L]
== Analyzed Logical Plan ==
id: bigint
Range 0, 5, 1, 8, [id#39L]
== Optimized Logical Plan ==
Range 0, 5, 1, 8, [id#39L]
== Physical Plan ==
WholeStageCodegen
: +- Range 0, 1, 8, 5, [id#39L]
|
Caution
|
FIXME What’s planner? analyzed? Why do we need assertSupported?
|
QueryExecution belongs to org.apache.spark.sql.execution package.
hiveResultString Method
hiveResultString(): Seq[String]
hiveResultString returns the result as a Hive-compatible sequence of strings.
scala> spark.range(5).queryExecution.hiveResultString
res0: Seq[String] = ArrayBuffer(0, 1, 2, 3, 4)
scala> spark.read.csv("people.csv").queryExecution.hiveResultString
res4: Seq[String] = ArrayBuffer(id name age, 0 Jacek 42)
|
Caution
|
FIXME |
Internally, hiveResultString does..
|
Note
|
hiveResultString is executed when…
|
Creating QueryExecution Instance
class QueryExecution(
val sparkSession: SparkSession,
val logical: LogicalPlan)
QueryExecution requires a SparkSession and a LogicalPlan.
Accessing SparkPlanner — planner Method
planner: SparkPlanner
planner returns the current SparkPlanner.
planner is merely to expose internal planner (in the current SessionState).
preparations — SparkPlan Preparation Rules (to apply before Query Execution)
preparations is a sequence of SparkPlan optimization rules.
|
Tip
|
A SparkPlan optimization rule transforms a physical operator (aka SparkPlan) to another (possibly more efficient) SparkPlan.
|
preparations collection is an intermediate phase of query execution that developers can used to introduce further optimizations.
The current list of SparkPlan transformations in preparations is as follows:
-
ExtractPythonUDFs -
PlanSubqueries -
CollapseCodegenStages -
ReuseExchange -
ReuseSubquery
|
Note
|
The transformation rules are applied sequentially in order to the physical plan before execution, i.e. they generate a SparkPlan when executedPlan lazy value is accessed.
|
IncrementalExecution
IncrementalExecution is a custom QueryExecution with OutputMode, checkpointLocation, and currentBatchId.
It lives in org.apache.spark.sql.execution.streaming package.
|
Caution
|
FIXME What is stateStrategy?
|
Stateful operators in the query plan are numbered using operatorId that starts with 0.
IncrementalExecution adds one Rule[SparkPlan] called state to preparations sequence of rules as the first element.
|
Caution
|
FIXME What does IncrementalExecution do? Where is it used?
|