I am trying to effectively join two DataFrames, one of which is large and the second is a bit smaller.
Is there a way to avoid all this shuffling? I cannot set autoBroadCastJoinThreshold
, because it supports only Integers - and the table I am trying to broadcast is slightly bigger than integer number of bytes.
Is there a way to force broadcast ignoring this variable?
When the broadcasted relation is small enough, broadcast joins are fast, as they require minimal data shuffling. Above a certain threshold however, broadcast joins tend to be less reliable or performant than shuffle-based join algorithms, due to bottlenecks in network and memory usage.
Broadcast Hash Join is the fastest join algorithm when the following criterias are met. Works only for equi joins. Works for all joins except for full outer joins. Broadcast Hash Join works when a dataset is small enough that it can be broadcasted and hashed.
If we do not want broadcast join to take place, we can disable by setting: "spark. sql. autoBroadcastJoinThreshold" to "-1".
In SparkSQL you can see the type of join being performed by calling queryExecution.executedPlan
. As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling method broadcast
on the DataFrame
before joining it
Example: largedataframe.join(broadcast(smalldataframe), "key")
in DWH terms, where largedataframe may be like fact
smalldataframe may be like dimension
As described by my fav book (HPS) pls. see below to have better understanding..
Note : Above broadcast
is from import org.apache.spark.sql.functions.broadcast
not from SparkContext
Spark also, automatically uses the spark.sql.conf.autoBroadcastJoinThreshold
to determine if a table should be broadcast.
def explain(): Unit Prints the physical plan to the console for debugging purposes.
Is there a way to force broadcast ignoring this variable?
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
NOTE :
Another similar out of box note w.r.t. Hive (not spark) : Similar thing can be achieved using hive hint
MAPJOIN
like below...
Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key hive> set hive.auto.convert.join=true; hive> set hive.auto.convert.join.noconditionaltask.size=20971520 hive> set hive.auto.convert.join.noconditionaltask=true; hive> set hive.auto.convert.join.use.nonstaged=true; hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb
Further Reading : Please refer my article on BHJ, SHJ, SMJ
You can hint for a dataframe to be broadcasted by using left.join(broadcast(right), ...)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With