We use broadcast hash join in Spark when we have one dataframe small enough to get fit into memory. When the size of small dataframe is below spark.sql.autoBroadcastJoinThreshold
I have few questions around this.
What is the life cycle of the small dataframe which we hint as broadcast? For how long it will remain in memory? How can we control it?
For example if I have joined a big dataframe with small dataframe two times using broadcast hash join. when first join performs it will broadcast the small dataframe to worker nodes and perform the join while avoiding shuffling of big dataframe data.
My question is that for how long will executor keep a copy of broadcast dataframe? Will it remain in memory till session ends? Or it will get cleared once we have taken any action. can we control or clear it? Or I am just thinking in wrong direction...
Broadcast hash join - A broadcast join copies the small data to the worker nodes which leads to a highly efficient and super-fast join. When we are joining two datasets and one of the datasets is much smaller than the other (e.g when the small dataset can fit into memory), then we should use a Broadcast Hash Join.
Broadcast nested loop join - In nested join for each row of first data set is iterate over every row of other dataset which may degrade performance in join operation.But in certain situation like join keys are not fixed as well as the query is qualified as broadcastable or not, according to the data statistics (size or broadcast hint).
It's used when neither broadcast hash join nor shuffled hash join nor sort merge join can be used to execute the join statement. 2nd point, data broadcasted or not? implied but then conditional as you state.impression from other answer that it is still broadcasted?
Right side in a left outer, left semi, left anti or existence join will be broadcasted. Either side can be broadcasted in an inner-like join. Once the dataset is broadcasted, every record from one dataset is attempted to join with every record from another dataset in a nested loop.
The answer to your question, at least in Spark 2.4.0, is that the dataframe will remain in memory on the driver process until the SparkContext is completed, that is, until your application ends.
Broadcast joins are in fact implemented using broadcast variables, but when using the DataFrame API you do not get access to the underling broadcast variable. Spark itself does not destroy this variable after it uses it internally, so it just stays around.
Specifically, if you look at the code of BroadcastExchangeExec (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala), you can see that it creates a private variable relationFuture
which holds the Broadcast variable. This private variable is only used in this class. There is no way for you as a user to get access to it to call destroy on it, and nowhere in the curretn implementation does Spark call it for you.
The idea here is to create broadcast variable before join to easily control it. Without it you can't control these variables - spark do it for you.
Example:
from pyspark.sql.functions import broadcast
sdf2_bd = broadcast(sdf2)
sdf1.join(sdf2_bd, sdf1.id == sdf2_bd.id)
To all broadcast variables(automatically created in joins or created by hands) this rules are applied:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With