Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to access broadcasted DataFrame in Spark

I have created two dataframes which are from Hive tables(PC_ITM and ITEM_SELL) and big in size and I am using those frequently in the SQL query by registering as table.But as those are big, it is taking much time to get the query result.So I have saved them as parquet file and then read them and registered as temporary table.But still I am not getting good performance so I have broadcasted those data-frames and then registered as tables as below.

PC_ITM_DF=sqlContext.parquetFile("path")
val PC_ITM_BC=sc.broadcast(PC_ITM_DF)
val PC_ITM_DF1=PC_ITM_BC
PC_ITM_DF1.registerAsTempTable("PC_ITM")

ITM_SELL_DF=sqlContext.parquetFile("path")
val ITM_SELL_BC=sc.broadcast(ITM_SELL_DF)
val ITM_SELL_DF1=ITM_SELL_BC.value
ITM_SELL_DF1.registerAsTempTable(ITM_SELL)


sqlContext.sql("JOIN Query").show

But still I cant achieve performance it is taking same time as when those data frames are not broadcasted.

Can anyone tell if this is the right approach of broadcasting and using it?`

like image 584
Raghavendra Kulkarni Avatar asked Jan 21 '16 18:01

Raghavendra Kulkarni


People also ask

How do I view a DataFrame in Spark?

You can visualize a Spark dataframe in Jupyter notebooks by using the display(<dataframe-name>) function. The display() function is supported only on PySpark kernels. The Qviz framework supports 1000 rows and 100 columns. By default, the dataframe is visualized as a table.

Can we broadcast a DataFrame in Spark?

Broadcast variables are used in the same way for RDD, DataFrame, and Dataset. When you run a Spark RDD, DataFrame jobs that has the Broadcast variables defined and used, Spark does the following. Spark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.

How do I get data from Spark DataFrame?

Spark collect() and collectAsList() are action operation that is used to retrieve all the elements of the RDD/DataFrame/Dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group(), count() e.t.c. Retrieving on larger dataset results in out of memory.

Where are broadcast variables stored in Spark?

A broadcast variable is stored on the driver's BlockManager as a single value and separately as chunks (of spark. broadcast. blockSize).


1 Answers

You don't really need to 'access' the broadcast dataframe - you just use it, and Spark will implement the broadcast under the hood. The broadcast function works nicely, and makes more sense that the sc.broadcast approach.

It can be hard to understand where the time is being spent if you evaluate everything at once.

You can break your code into steps. The key here will be performing an action and persisting the dataframes you want to broadcast before you use them in your join.

// load your dataframe
PC_ITM_DF=sqlContext.parquetFile("path")

// mark this dataframe to be stored in memory once evaluated
PC_ITM_DF.persist()

// mark this dataframe to be broadcast
broadcast(PC_ITM_DF)

// perform an action to force the evaluation
PC_ITM_DF.count()

Doing this will ensure that the dataframe is

  • loaded in memory (persist)
  • registered as temp table for use in your SQL query
  • marked as broadcast, so will be shipped to all executors

When you now run sqlContext.sql("JOIN Query").show you should now see a 'broadcast hash join' in the SQL tab of your Spark UI.

like image 117
Kirk Broadhurst Avatar answered Nov 13 '22 11:11

Kirk Broadhurst