Recently I saw some strange behaviour of Spark.
I have a pipeline in my application in which I'm manipulating one big Dataset - pseudocode:
val data = spark.read (...)
data.join(df1, "key") //etc, more transformations
data.cache(); // used to not recalculate data after save
data.write.parquet() // some save
val extension = data.join (..) // more transformations - joins, selects, etc.
extension.cache(); // again, cache to not double calculations
extension.count();
// (1)
extension.write.csv() // some other save
extension.groupBy("key").agg(some aggregations) //
extension.write.parquet() // other save, without cache it will trigger recomputation of whole dataset
However when I call data.unpersist()
i.e. in place (1)
, Spark deletes from Storage all datasets, also the extension
Dataset which is not the dataset I tried to unpersist.
Is that an expected behaviour? How can I free some memory by unpersist
on old Dataset without unpersisting all Dataset that was "next in chain"?
My setup:
Question looks similar to Understanding Spark's caching, but here I'm doing some actions before unpersist. At first I'm counting everything and then save into storage - I don't know if caching works the same in RDD like in Datasets
Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk.
The persist() function stores the data into the memory. Spark Unpersist() marks Dataframe or Dataset as non-persistent, and it removes all the blocks for it from the memory and disk. Spark automatically monitors every persist() and Cache () call made by the user and further checks usage on each node.
(transitive, computing) To remove from permanent storage; to make temporary again.
This is an expected behavior from spark caching. Spark doesn't want to keep invalid cache data. It completely removes all the cached plans refer to the dataset.
This is to make sure the query is correct. In the example you are creating extension dataset
from cached dataset data
. Now if the dataset data
is unpersisted essentially extension dataset can no longer rely on the cached dataset data
.
Here is the Pull request for the fix they made. You can see similar JIRA ticket
Answer for Spark 2.4:
There was a ticket about correctness in Datasets and caching behaviour, see https://issues.apache.org/jira/browse/SPARK-24596
From Maryann Xue description, now caching will work in following manner:
Where "regular mode" means mdoe from the questions and @Avishek's answer and non-cascading mode means, that extension
won't be unpersisted
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With