Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Broadcast not happening while joining dataframes in Spark 1.6

Below is the sample code that I am running. when this spark job runs, Dataframe joins are happening using sortmergejoin instead of broadcastjoin.

def joinedDf (sqlContext: SQLContext,
              txnTable:   DataFrame,
              countriesDfBroadcast: Broadcast[DataFrame]): 
              DataFrame = {
                    txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
                    $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
              }
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")  

The broadcastjoin is not happening even when I specify a broadcast() hint in the join statement.

The optimizer is hashpartitioning the dataframe and it is causing data skew.

Has anyone seen this behavior?

I am running this on yarn using Spark 1.6 and HiveContext as SQLContext. The spark job runs on 200 executors. and the data size of the txnTable is 240GB and the datasize of countriesDf is 5mb.

like image 342
Prasad R. Avatar asked Feb 05 '16 23:02

Prasad R.


People also ask

How do I enable broadcast join in PySpark?

Syntax for PySpark Broadcast JoinB1: The first data frame to be used for join. B: The second broadcasted Data frame. Join:- The join operation used for joining. Broadcast: Keyword to broadcast the data frame.

Can I broadcast a DataFrame in Spark?

Broadcast variables are used in the same way for RDD, DataFrame, and Dataset. When you run a Spark RDD, DataFrame jobs that has the Broadcast variables defined and used, Spark does the following. Spark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.

How does broadcast join work in Spark?

Broadcast join in spark is preferred when we want to join one small data frame with the large one. the requirement here is we should be able to store the small data frame easily in the memory so that we can join them with the large data frame in order to boost the performance of the join.

What is the threshold size limit for broadcast join in Spark?

The maximum size for the broadcast table is 8GB. Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using spark. sql.


1 Answers

Both the way how you broadcast DataFrame and how you access it are incorrect.

  • Standard broadcasts cannot be used to handle distributed data structures. If you want to perform broadcast join on a DataFrame you should use broadcast functions which marks given DataFrame for broadcasting:

    import org.apache.spark.sql.functions.broadcast
    
    val countriesDf: DataFrame = ???
    val tmp: DataFrame = broadcast(
      countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries")
    ) 
    
    txnTable.as("df1").join(
      broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
    

    Internally it will collect tmp without converting from internal and broadcast afterwards.

  • join arguments are eagerly evaluated. Even it was possible to use SparkContext.broadcast with distributed data structure broadcast value is evaluated locally before join is called. Thats' why your function work at all but doesn't perform broadcast join.

like image 177
zero323 Avatar answered Sep 28 '22 10:09

zero323