I have a large dataframe that has been cached like
val largeDf = someLargeDataframe.cache
Now I need to union it with a tiny one and cached it again
val tinyDf = someTinyDataframe.cache
val newDataframe = largeDf.union(tinyDf).cached
tinyDf.unpersist()
largeDf.unpersist()
It is very inefficient since it need to re-cached all the data again. Is there any efficient way to add a little amount of data to a large cached dataframe?
After reading Teodors's explanation, I know that I can't unpersist the old dataframe before I do some action on my new dataframe. But what if I need to do something like this?
def myProcess(df1: Dataframe, df2: Dataframe): Dataframe{
val df1_trans = df1.map(....).cache
val df2_trans = df2.map(....).cache
doSomeAction(df1_trans, df2_trans)
val finalDf = df1_trans.union(df2_trans).map(....).cache
// df1_trans.unpersist()
// df2_trans.unpersist()
finalDf
}
I want my df1_trans & df2_trans to be cached to improve the performance inside the function since they will be called more than once, but the dataframe I need to return in the end is also constructed by df1_trans & df2_trans, if I can't unpersist them before leaving the function, I can never find other place to do this, however, if I unpersist them, my finalDf will not benefit from cache.
What can I do in this situation? Thanks!
Step 1 : Shuffling: The data from the Join tables are partitioned based on the Join key. It does shuffle the data across partitions to have the same Join keys of the record assigned to the corresponding partitions. Step 2- Hash Join: A classic single node Hash Join algorithm is performed for the data on each partition.
PySpark Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames; it supports all basic join type operations available in traditional SQL like INNER , LEFT OUTER , RIGHT OUTER , LEFT ANTI , LEFT SEMI , CROSS , SELF JOIN.
You should use sqlContext. cacheTable("table_name") in order to cache it, or alternatively use CACHE TABLE table_name SQL query.
Caching methods in SparkDISK_ONLY: Persist data on disk only in serialized format. MEMORY_ONLY: Persist data in memory only in deserialized format. MEMORY_AND_DISK: Persist data in memory and if enough memory is not available evicted blocks will be stored on disk. OFF_HEAP: Data is persisted in off-heap memory.
val largeDf = someLargeDataframe.cache
val tinyDf = someTinyDataframe.cache
val newDataframe = largeDf.union(tinyDf).cache
If you call unpersist() now before any action that goes through all your largeDf dataframe you won't benefit from caching the two dataframes.
tinyDf.unpersist()
largeDf.unpersist()
I wouldn't worry about caching the unioned dataframe as long as the two other dataframes are already cached, you won't likely see a performance hit.
Benchmark the following:
========= now? ============
val largeDf = someLargeDataframe.cache
val tinyDf = someTinyDataframe.cache
val newDataframe = largeDf.union(tinyDf).cache
tinyDf.unpersist()
largeDf.unpersist()
#force evaluation
newDataframe.count()
========= alternative 1 ============
val largeDf = someLargeDataframe.cache
val tinyDf = someTinyDataframe.cache
val newDataframe = largeDf.union(tinyDf).cache
#force evaluation
newDataframe.count()
tinyDf.unpersist()
largeDf.unpersist()
======== alternative 2 ==============
val largeDf = someLargeDataframe.cache
val tinyDf = someTinyDataframe.cache
val newDataframe = largeDf.union(tinyDf)
newDataframe.count()
======== alternative 3 ==============
val largeDf = someLargeDataframe
val tinyDf = someTinyDataframe
val newDataframe = largeDf.union(tinyDf).cache
#force evaluation
newDataframe.count()
Is there any efficient way to add a little amount of data to a large cached dataframe?
I don't think any other operation could beat union
. I did think that broadcast function might help here, but after having a look at the execution plan I don't think so anymore.
That led me to write the answer. If you want to know if your caching has any effect on a query, explain it:
explain(): Unit Prints the physical plan to the console for debugging purposes.
With the following example, broadcast
does not affect union
(which is now not surprising given it's a hint for joins and other physical operators just ignore it).
scala> left.union(broadcast(right)).explain
== Physical Plan ==
Union
:- *Range (0, 4, step=1, splits=8)
+- *Range (0, 3, step=1, splits=8)
It's also worthwhile to use Details for Query under SQL tab.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With