class BroadcastHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends BinaryExecNode with HashJoin with CodegenSupport
BroadcastHashJoinExec
Physical Operator
BroadcastHashJoinExec
is a binary physical operator with CodegenSupport.
BroadcastHashJoinExec
is a result of applying JoinSelection
physical plan strategy to ExtractEquiJoinKeys-destructurable logical query plans, i.e. "INNER", "CROSS", "LEFT OUTER", "LEFT SEMI", "LEFT ANTI", of which the right
physical operator can be broadcast.
scala> val df = Seq((0,"playing"), (1, "with"), (2, "broadcast")).toDF("id", "token")
df: org.apache.spark.sql.DataFrame = [id: int, token: string]
scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res5: String = 10485760
scala> df.join(df, Seq("id"), "inner").explain(extended=true)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('id))
:- Project [_1#30 AS id#33, _2#31 AS token#34]
: +- LocalRelation [_1#30, _2#31]
+- Project [_1#30 AS id#38, _2#31 AS token#39]
+- LocalRelation [_1#30, _2#31]
== Analyzed Logical Plan ==
id: int, token: string, token: string
Project [id#33, token#34, token#39]
+- Join Inner, (id#33 = id#38)
:- Project [_1#30 AS id#33, _2#31 AS token#34]
: +- LocalRelation [_1#30, _2#31]
+- Project [_1#30 AS id#38, _2#31 AS token#39]
+- LocalRelation [_1#30, _2#31]
== Optimized Logical Plan ==
Project [id#33, token#34, token#39]
+- Join Inner, (id#33 = id#38)
:- LocalRelation [id#33, token#34]
+- LocalRelation [id#38, token#39]
== Physical Plan ==
*Project [id#33, token#34, token#39]
+- *BroadcastHashJoin [id#33], [id#38], Inner, BuildRight
:- LocalTableScan [id#33, token#34]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#38, token#39]
BroadcastHashJoinExec
variables in CodegenSupport-generated code are prefixed with bhj
.
Tip
|
Use debugCodegen method to review the CodegenSupport -generated code.
|
scala> df.join(df, Seq("id"), "inner").debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#33, token#34, token#52]
+- *BroadcastHashJoin [id#33], [id#51], Inner, BuildRight
:- LocalTableScan [id#33, token#34]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#51, token#52]
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 010 */ private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation;
/* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 012 */ private UnsafeRow bhj_result;
/* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
/* 015 */ private UnsafeRow project_result;
/* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 018 */
...