Below is the sample code that I am running. when this spark job runs, Dataframe joins are happening using sortmergejoin instead of broadcastjoin.
def joinedDf (sqlContext: SQLContext,
txnTable: DataFrame,
countriesDfBroadcast: Broadcast[DataFrame]):
DataFrame = {
txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
$"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
}
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")
The broadcastjoin is not happening even when I specify a broadcast() hint in the join statement.
The optimizer is hashpartitioning the dataframe and it is causing data skew.
Has anyone seen this behavior?
I am running this on yarn using Spark 1.6 and HiveContext as SQLContext. The spark job runs on 200 executors. and the data size of the txnTable is 240GB and the datasize of countriesDf is 5mb.
Syntax for PySpark Broadcast JoinB1: The first data frame to be used for join. B: The second broadcasted Data frame. Join:- The join operation used for joining. Broadcast: Keyword to broadcast the data frame.
Broadcast variables are used in the same way for RDD, DataFrame, and Dataset. When you run a Spark RDD, DataFrame jobs that has the Broadcast variables defined and used, Spark does the following. Spark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.
Broadcast join in spark is preferred when we want to join one small data frame with the large one. the requirement here is we should be able to store the small data frame easily in the memory so that we can join them with the large data frame in order to boost the performance of the join.
The maximum size for the broadcast table is 8GB. Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using spark. sql.
Both the way how you broadcast DataFrame
and how you access it are incorrect.
Standard broadcasts cannot be used to handle distributed data structures. If you want to perform broadcast join on a DataFrame
you should use broadcast
functions which marks given DataFrame
for broadcasting:
import org.apache.spark.sql.functions.broadcast
val countriesDf: DataFrame = ???
val tmp: DataFrame = broadcast(
countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries")
)
txnTable.as("df1").join(
broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
Internally it will collect
tmp
without converting from internal and broadcast afterwards.
join arguments are eagerly evaluated. Even it was possible to use SparkContext.broadcast
with distributed data structure broadcast value is evaluated locally before join
is called. Thats' why your function work at all but doesn't perform broadcast join.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With