Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficient way to join a cached spark dataframe with other and cache again

I have a large dataframe that has been cached like

val largeDf = someLargeDataframe.cache

Now I need to union it with a tiny one and cached it again

val tinyDf = someTinyDataframe.cache
val newDataframe = largeDf.union(tinyDf).cached
tinyDf.unpersist()
largeDf.unpersist()

It is very inefficient since it need to re-cached all the data again. Is there any efficient way to add a little amount of data to a large cached dataframe?


After reading Teodors's explanation, I know that I can't unpersist the old dataframe before I do some action on my new dataframe. But what if I need to do something like this?

def myProcess(df1: Dataframe, df2: Dataframe): Dataframe{
    val df1_trans = df1.map(....).cache
    val df2_trans = df2.map(....).cache

    doSomeAction(df1_trans, df2_trans)

    val finalDf = df1_trans.union(df2_trans).map(....).cache
    // df1_trans.unpersist()
    // df2_trans.unpersist()
    finalDf
}

I want my df1_trans & df2_trans to be cached to improve the performance inside the function since they will be called more than once, but the dataframe I need to return in the end is also constructed by df1_trans & df2_trans, if I can't unpersist them before leaving the function, I can never find other place to do this, however, if I unpersist them, my finalDf will not benefit from cache.

What can I do in this situation? Thanks!

like image 661
林鼎棋 Avatar asked May 24 '17 07:05

林鼎棋


People also ask

How do you optimize a join in Spark?

Step 1 : Shuffling: The data from the Join tables are partitioned based on the Join key. It does shuffle the data across partitions to have the same Join keys of the record assigned to the corresponding partitions. Step 2- Hash Join: A classic single node Hash Join algorithm is performed for the data on each partition.

How do I join two data frames in Spark?

PySpark Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames; it supports all basic join type operations available in traditional SQL like INNER , LEFT OUTER , RIGHT OUTER , LEFT ANTI , LEFT SEMI , CROSS , SELF JOIN.

Which of the following is are the correct ways to cache the data tables in Spark SQL?

You should use sqlContext. cacheTable("table_name") in order to cache it, or alternatively use CACHE TABLE table_name SQL query.

What is powerful caching in Spark?

Caching methods in SparkDISK_ONLY: Persist data on disk only in serialized format. MEMORY_ONLY: Persist data in memory only in deserialized format. MEMORY_AND_DISK: Persist data in memory and if enough memory is not available evicted blocks will be stored on disk. OFF_HEAP: Data is persisted in off-heap memory.


2 Answers

val largeDf = someLargeDataframe.cache
val tinyDf = someTinyDataframe.cache
val newDataframe = largeDf.union(tinyDf).cache

If you call unpersist() now before any action that goes through all your largeDf dataframe you won't benefit from caching the two dataframes.

tinyDf.unpersist()
largeDf.unpersist()

I wouldn't worry about caching the unioned dataframe as long as the two other dataframes are already cached, you won't likely see a performance hit.

Benchmark the following:

========= now? ============
val largeDf = someLargeDataframe.cache
val tinyDf = someTinyDataframe.cache
val newDataframe = largeDf.union(tinyDf).cache
tinyDf.unpersist()
largeDf.unpersist()
#force evaluation
newDataframe.count()

========= alternative 1 ============
val largeDf = someLargeDataframe.cache
val tinyDf = someTinyDataframe.cache
val newDataframe = largeDf.union(tinyDf).cache

#force evaluation
newDataframe.count()
tinyDf.unpersist()
largeDf.unpersist()

======== alternative 2 ==============
val largeDf = someLargeDataframe.cache
val tinyDf = someTinyDataframe.cache
val newDataframe = largeDf.union(tinyDf)

newDataframe.count()


======== alternative 3 ==============
val largeDf = someLargeDataframe
val tinyDf = someTinyDataframe
val newDataframe = largeDf.union(tinyDf).cache

#force evaluation
newDataframe.count()
like image 54
Boggio Avatar answered Sep 27 '22 23:09

Boggio


Is there any efficient way to add a little amount of data to a large cached dataframe?

I don't think any other operation could beat union. I did think that broadcast function might help here, but after having a look at the execution plan I don't think so anymore.

That led me to write the answer. If you want to know if your caching has any effect on a query, explain it:

explain(): Unit Prints the physical plan to the console for debugging purposes.

With the following example, broadcast does not affect union (which is now not surprising given it's a hint for joins and other physical operators just ignore it).

scala> left.union(broadcast(right)).explain
== Physical Plan ==
Union
:- *Range (0, 4, step=1, splits=8)
+- *Range (0, 3, step=1, splits=8)

It's also worthwhile to use Details for Query under SQL tab.

enter image description here

like image 39
Jacek Laskowski Avatar answered Sep 27 '22 23:09

Jacek Laskowski