Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Broadcast hash join - Iterative

We use broadcast hash join in Spark when we have one dataframe small enough to get fit into memory. When the size of small dataframe is below spark.sql.autoBroadcastJoinThreshold I have few questions around this.

What is the life cycle of the small dataframe which we hint as broadcast? For how long it will remain in memory? How can we control it?

For example if I have joined a big dataframe with small dataframe two times using broadcast hash join. when first join performs it will broadcast the small dataframe to worker nodes and perform the join while avoiding shuffling of big dataframe data.

My question is that for how long will executor keep a copy of broadcast dataframe? Will it remain in memory till session ends? Or it will get cleared once we have taken any action. can we control or clear it? Or I am just thinking in wrong direction...

like image 862
vikrant rana Avatar asked Dec 14 '18 17:12

vikrant rana


People also ask

What is a broadcast hash join?

Broadcast hash join - A broadcast join copies the small data to the worker nodes which leads to a highly efficient and super-fast join. When we are joining two datasets and one of the datasets is much smaller than the other (e.g when the small dataset can fit into memory), then we should use a Broadcast Hash Join.

What is a broadcast nested loop join?

Broadcast nested loop join - In nested join for each row of first data set is iterate over every row of other dataset which may degrade performance in join operation.But in certain situation like join keys are not fixed as well as the query is qualified as broadcastable or not, according to the data statistics (size or broadcast hint).

When to use conditional join in hash join statement?

It's used when neither broadcast hash join nor shuffled hash join nor sort merge join can be used to execute the join statement. 2nd point, data broadcasted or not? implied but then conditional as you state.impression from other answer that it is still broadcasted?

Which side of the dataset is broadcasted in a join?

Right side in a left outer, left semi, left anti or existence join will be broadcasted. Either side can be broadcasted in an inner-like join. Once the dataset is broadcasted, every record from one dataset is attempted to join with every record from another dataset in a nested loop.


2 Answers

The answer to your question, at least in Spark 2.4.0, is that the dataframe will remain in memory on the driver process until the SparkContext is completed, that is, until your application ends.

Broadcast joins are in fact implemented using broadcast variables, but when using the DataFrame API you do not get access to the underling broadcast variable. Spark itself does not destroy this variable after it uses it internally, so it just stays around.

Specifically, if you look at the code of BroadcastExchangeExec (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala), you can see that it creates a private variable relationFuture which holds the Broadcast variable. This private variable is only used in this class. There is no way for you as a user to get access to it to call destroy on it, and nowhere in the curretn implementation does Spark call it for you.

like image 73
Dave DeCaprio Avatar answered Sep 28 '22 05:09

Dave DeCaprio


The idea here is to create broadcast variable before join to easily control it. Without it you can't control these variables - spark do it for you.

Example:

from pyspark.sql.functions import broadcast

sdf2_bd = broadcast(sdf2)
sdf1.join(sdf2_bd, sdf1.id == sdf2_bd.id)

To all broadcast variables(automatically created in joins or created by hands) this rules are applied:

  1. The broadcast data is sent only to the nodes that contain an executor that needs it.
  2. The broadcast data is stored in memory. If not enough memory is available, the disk is used.
  3. When you are done with a broadcast variable, you should destroy it to release memory.
like image 45
luminousmen Avatar answered Sep 28 '22 06:09

luminousmen