I could not find any discussion on below topic in any forum I searched in internet. It may be because I am new to Spark and Scala and I am not asking a valid question. If there are any existing threads discussing the same or similar topic, the links will be very helpful. :)
I am working on a process which uses Spark and Scala and creates a file by reading a lot of tables and deriving a lot of fields by applying logic to the data fetched from tables. So, the structure of my code is like this:
val driver_sql = "SELECT ...";
var df_res = spark.sql(driver_sql)
var df_res = df_res.withColumn("Col1", <logic>)
var df_res = df_res.withColumn("Col2", <logic>)
var df_res = df_res.withColumn("Col3", <logic>)
.
.
.
var df_res = df_res.withColumn("Col20", <logic>)
Basically, there is a driver query which creates the "driver" dataframe. After that, separate logic (functions) is executed based on a key or keys in the driver dataframe to add new columns/fields. The "logic" part is not always a one-line code, sometimes, it is a separate function which runs another query and does some kind of join on df_res and adds a new column. Record count also changes since I use “inner” join with other tables/dataframes in some cases.
So, here are my questions:
df_res
at any point in time?df_res
again and again after columns are added? I mean, does it add value?df_res
(disk only) every time a new column is added, is the data in the disk replaced? Or does it create a new copy/version of df_res
in the disk?Spark provides a convenient way to work on the dataset by persisting it in memory across operations. While persisting an RDD, each node stores any partitions of it that it computes in memory. Now, we can also reuse them in other tasks on that dataset.
The Cache () and Persist() are the two dataframe persistence methods in apache spark. So, using these methods, Spark provides the optimization mechanism to store intermediate computation of any Spark Dataframe to reuse in the subsequent actions.
The only difference between cache() and persist() is ,using Cache technique we can save intermediate results in memory only when needed while in Persist() we can save the intermediate results in 5 storage levels(MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY). Save this answer.
Spark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or using least-recently-used (LRU) algorithm. As discussed in one of the above section you can also manually remove using unpersist() method.
The first thing is persisting a dataframe helps when you are going to apply iterative operations on dataframe.
What you are doing here is applying transformation operation on your dataframes. There is no need to persist these dataframes here.
For eg:- Persisting would be helpful if you are doing something like this.
val df = spark.sql("select * from ...").persist
df.count
val df1 = df.select("..").withColumn("xyz",udf(..))
df1.count
val df2 = df.select("..").withColumn("abc",udf2(..))
df2.count
Now, if you persist df here then it would be beneficial in calculating df1 and df2. One more thing to notice here is, the reason why I did df.count is because dataframe is persisted only when an action is applied on it. From Spark docs: "The first time it is computed in an action, it will be kept in memory on the nodes". And this answers your second question as well.
Every time you persist a new copy will be created but you should unpersist the prev one first.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With