I'd like to know if spark.sql.autoBroadcastJoinThreshold
property can be useful for broadcasting smaller table on all worker nodes (while making the join) even when the join scheme is using the Dataset API join instead of using Spark SQL.
If my bigger table is 250 Gigs and Smaller is 20 Gigs, do I need to set this config: spark.sql.autoBroadcastJoinThreshold
= 21 Gigs (maybe) in order for sending the whole table / Dataset
to all worker nodes?
Examples:
Dataset API join
val result = rawBigger.as("b").join( broadcast(smaller).as("s"), rawBigger(FieldNames.CAMPAIGN_ID) === smaller(FieldNames.CAMPAIGN_ID), "left_outer" )
SQL
select * from rawBigger_table b, smaller_table s where b.campign_id = s.campaign_id;
spark.sql.autoBroadcastJoinThreshold. 10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled.
Broadcast join is one of the efficient joining strategy in spark as there is least amount of data shuffling across the nodes.
Now, coming to Broadcast Hash Join. In broadcast hash join, copy of one of the join relations are being sent to all the worker nodes and it saves shuffling cost. This is useful when you are joining a large relation with a smaller one. It is also known as map-side join(associating worker nodes with mappers).
First of all spark.sql.autoBroadcastJoinThreshold
and broadcast
hint are separate mechanisms. Even if autoBroadcastJoinThreshold
is disabled setting broadcast
hint will take precedence. With default settings:
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
String = 10485760
val df1 = spark.range(100) val df2 = spark.range(100)
Spark will use autoBroadcastJoinThreshold
and automatically broadcast data:
df1.join(df2, Seq("id")).explain
== Physical Plan == *Project [id#0L] +- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight :- *Range (0, 100, step=1, splits=Some(8)) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 100, step=1, splits=Some(8))
When we disable auto broadcast Spark will use standard SortMergeJoin
:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df1.join(df2, Seq("id")).explain
== Physical Plan == *Project [id#0L] +- *SortMergeJoin [id#0L], [id#3L], Inner :- *Sort [id#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0L, 200) : +- *Range (0, 100, step=1, splits=Some(8)) +- *Sort [id#3L ASC NULLS FIRST], false, 0 +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)
but can forced to use BroadcastHashJoin
with broadcast
hint:
df1.join(broadcast(df2), Seq("id")).explain
== Physical Plan == *Project [id#0L] +- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight :- *Range (0, 100, step=1, splits=Some(8)) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 100, step=1, splits=Some(8))
SQL has its own hints format (similar to the one used in Hive):
df1.createOrReplaceTempView("df1") df2.createOrReplaceTempView("df2") spark.sql( "SELECT /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id" ).explain
== Physical Plan == *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight :- *Range (0, 100, step=1, splits=8) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 100, step=1, splits=8)
So to answer your question - autoBroadcastJoinThreshold
is applicable when working with Dataset
API, but it is not relevant when using explicit broadcast
hints.
Furthermore broadcasting large objects is unlikely provide any performance boost, and in practice will often degrade performance and result in stability issue. Remember that broadcasted object has to be first fetch to driver, then send to each worker, and finally loaded into memory.
Just to share more details (from the code) to the great answer from @user6910411.
Quoting the source code (formatting mine):
spark.sql.autoBroadcastJoinThreshold configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join.
By setting this value to -1 broadcasting can be disabled.
Note that currently statistics are only supported for Hive Metastore tables where the command
ANALYZE TABLE COMPUTE STATISTICS noscan
has been run, and file-based data source tables where the statistics are computed directly on the files of data.
spark.sql.autoBroadcastJoinThreshold
defaults to 10M (i.e. 10L * 1024 * 1024
) and Spark will check what join to use (see JoinSelection execution planning strategy).
There are 6 different join selections and among them is broadcasting (using BroadcastHashJoinExec
or BroadcastNestedLoopJoinExec
physical operators).
BroadcastHashJoinExec
will get chosen when there are joining keys and one of the following holds:
spark.sql.autoBroadcastJoinThreshold
spark.sql.autoBroadcastJoinThreshold
BroadcastNestedLoopJoinExec
will get chosen when there are no joining keys and one of the above conditions of BroadcastHashJoinExec
holds.
In other words, Spark will automatically select the right join, including BroadcastHashJoinExec
based on spark.sql.autoBroadcastJoinThreshold
property (among other requirements) but also the join type.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With