BroadcastHashJoinExec Physical Operator

BroadcastHashJoinExec is a binary physical operator with CodegenSupport.

class BroadcastHashJoinExec(
  leftKeys: Seq[Expression],
  rightKeys: Seq[Expression],
  joinType: JoinType,
  buildSide: BuildSide,
  condition: Option[Expression],
  left: SparkPlan,
  right: SparkPlan)
extends BinaryExecNode with HashJoin with CodegenSupport

BroadcastHashJoinExec is a result of applying JoinSelection physical plan strategy to ExtractEquiJoinKeys-destructurable logical query plans, i.e. "INNER", "CROSS", "LEFT OUTER", "LEFT SEMI", "LEFT ANTI", of which the right physical operator can be broadcast.

scala> val df = Seq((0,"playing"), (1, "with"), (2, "broadcast")).toDF("id", "token")
df: org.apache.spark.sql.DataFrame = [id: int, token: string]

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res5: String = 10485760

scala> df.join(df, Seq("id"), "inner").explain(extended=true)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('id))
:- Project [_1#30 AS id#33, _2#31 AS token#34]
:  +- LocalRelation [_1#30, _2#31]
+- Project [_1#30 AS id#38, _2#31 AS token#39]
   +- LocalRelation [_1#30, _2#31]

== Analyzed Logical Plan ==
id: int, token: string, token: string
Project [id#33, token#34, token#39]
+- Join Inner, (id#33 = id#38)
   :- Project [_1#30 AS id#33, _2#31 AS token#34]
   :  +- LocalRelation [_1#30, _2#31]
   +- Project [_1#30 AS id#38, _2#31 AS token#39]
      +- LocalRelation [_1#30, _2#31]

== Optimized Logical Plan ==
Project [id#33, token#34, token#39]
+- Join Inner, (id#33 = id#38)
   :- LocalRelation [id#33, token#34]
   +- LocalRelation [id#38, token#39]

== Physical Plan ==
*Project [id#33, token#34, token#39]
+- *BroadcastHashJoin [id#33], [id#38], Inner, BuildRight
   :- LocalTableScan [id#33, token#34]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#38, token#39]

BroadcastHashJoinExec variables in CodegenSupport-generated code are prefixed with bhj.

Tip
Use debugCodegen method to review the CodegenSupport-generated code.
scala> df.join(df, Seq("id"), "inner").debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#33, token#34, token#52]
+- *BroadcastHashJoin [id#33], [id#51], Inner, BuildRight
   :- LocalTableScan [id#33, token#34]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#51, token#52]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 010 */   private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation;
/* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 012 */   private UnsafeRow bhj_result;
/* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
/* 015 */   private UnsafeRow project_result;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 018 */
...

results matching ""

    No results matching ""