Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does spark.sql.autoBroadcastJoinThreshold work for joins using Dataset's join operator?

Tags:

I'd like to know if spark.sql.autoBroadcastJoinThreshold property can be useful for broadcasting smaller table on all worker nodes (while making the join) even when the join scheme is using the Dataset API join instead of using Spark SQL.

If my bigger table is 250 Gigs and Smaller is 20 Gigs, do I need to set this config: spark.sql.autoBroadcastJoinThreshold = 21 Gigs (maybe) in order for sending the whole table / Dataset to all worker nodes?

Examples:

  • Dataset API join

    val result = rawBigger.as("b").join(   broadcast(smaller).as("s"),   rawBigger(FieldNames.CAMPAIGN_ID) === smaller(FieldNames.CAMPAIGN_ID),    "left_outer" ) 
  • SQL

    select *  from rawBigger_table b, smaller_table s where b.campign_id = s.campaign_id; 
like image 794
Apratim Tiwari Avatar asked May 15 '17 16:05

Apratim Tiwari


People also ask

What is Spark SQL autoBroadcastJoinThreshold?

spark.sql.autoBroadcastJoinThreshold. 10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled.

Which of the following is a Spark strategy for joins?

Broadcast join is one of the efficient joining strategy in spark as there is least amount of data shuffling across the nodes.

Is map side join and broadcast join same?

Now, coming to Broadcast Hash Join. In broadcast hash join, copy of one of the join relations are being sent to all the worker nodes and it saves shuffling cost. This is useful when you are joining a large relation with a smaller one. It is also known as map-side join(associating worker nodes with mappers).


2 Answers

First of all spark.sql.autoBroadcastJoinThreshold and broadcast hint are separate mechanisms. Even if autoBroadcastJoinThreshold is disabled setting broadcast hint will take precedence. With default settings:

spark.conf.get("spark.sql.autoBroadcastJoinThreshold") 
String = 10485760 
val df1 = spark.range(100) val df2 = spark.range(100) 

Spark will use autoBroadcastJoinThreshold and automatically broadcast data:

df1.join(df2, Seq("id")).explain 
== Physical Plan == *Project [id#0L] +- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight    :- *Range (0, 100, step=1, splits=Some(8))    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))       +- *Range (0, 100, step=1, splits=Some(8)) 

When we disable auto broadcast Spark will use standard SortMergeJoin:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df1.join(df2, Seq("id")).explain 
== Physical Plan == *Project [id#0L] +- *SortMergeJoin [id#0L], [id#3L], Inner    :- *Sort [id#0L ASC NULLS FIRST], false, 0    :  +- Exchange hashpartitioning(id#0L, 200)    :     +- *Range (0, 100, step=1, splits=Some(8))    +- *Sort [id#3L ASC NULLS FIRST], false, 0       +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200) 

but can forced to use BroadcastHashJoin with broadcast hint:

df1.join(broadcast(df2), Seq("id")).explain 
== Physical Plan == *Project [id#0L] +- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight    :- *Range (0, 100, step=1, splits=Some(8))    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))       +- *Range (0, 100, step=1, splits=Some(8)) 

SQL has its own hints format (similar to the one used in Hive):

df1.createOrReplaceTempView("df1") df2.createOrReplaceTempView("df2")  spark.sql(  "SELECT  /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id" ).explain 
== Physical Plan == *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight :- *Range (0, 100, step=1, splits=8) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))    +- *Range (0, 100, step=1, splits=8) 

So to answer your question - autoBroadcastJoinThreshold is applicable when working with Dataset API, but it is not relevant when using explicit broadcast hints.

Furthermore broadcasting large objects is unlikely provide any performance boost, and in practice will often degrade performance and result in stability issue. Remember that broadcasted object has to be first fetch to driver, then send to each worker, and finally loaded into memory.

like image 107
zero323 Avatar answered Sep 23 '22 21:09

zero323


Just to share more details (from the code) to the great answer from @user6910411.


Quoting the source code (formatting mine):

spark.sql.autoBroadcastJoinThreshold configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join.

By setting this value to -1 broadcasting can be disabled.

Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run, and file-based data source tables where the statistics are computed directly on the files of data.

spark.sql.autoBroadcastJoinThreshold defaults to 10M (i.e. 10L * 1024 * 1024) and Spark will check what join to use (see JoinSelection execution planning strategy).

There are 6 different join selections and among them is broadcasting (using BroadcastHashJoinExec or BroadcastNestedLoopJoinExec physical operators).

BroadcastHashJoinExec will get chosen when there are joining keys and one of the following holds:

  • Join is one of CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI and right join side can be broadcast, i.e. the size is less than spark.sql.autoBroadcastJoinThreshold
  • Join is one of CROSS, INNER and RIGHT OUTER and left join side can be broadcast, i.e. the size is less than spark.sql.autoBroadcastJoinThreshold

BroadcastNestedLoopJoinExec will get chosen when there are no joining keys and one of the above conditions of BroadcastHashJoinExec holds.

In other words, Spark will automatically select the right join, including BroadcastHashJoinExec based on spark.sql.autoBroadcastJoinThreshold property (among other requirements) but also the join type.

like image 34
Jacek Laskowski Avatar answered Sep 24 '22 21:09

Jacek Laskowski