Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark' Dataset unpersist behaviour

Recently I saw some strange behaviour of Spark.

I have a pipeline in my application in which I'm manipulating one big Dataset - pseudocode:

val data = spark.read (...)
data.join(df1, "key") //etc, more transformations
data.cache(); // used to not recalculate data after save
data.write.parquet() // some save

val extension = data.join (..) // more transformations - joins, selects, etc.
extension.cache(); // again, cache to not double calculations
extension.count();
// (1)
extension.write.csv() // some other save

extension.groupBy("key").agg(some aggregations) //
extension.write.parquet() // other save, without cache it will trigger recomputation of whole dataset

However when I call data.unpersist() i.e. in place (1), Spark deletes from Storage all datasets, also the extension Dataset which is not the dataset I tried to unpersist.

Is that an expected behaviour? How can I free some memory by unpersist on old Dataset without unpersisting all Dataset that was "next in chain"?

My setup:

  • Spark version: current master, RC for 2.3
  • Scala: 2.11
  • Java: OpenJDK 1.8

Question looks similar to Understanding Spark's caching, but here I'm doing some actions before unpersist. At first I'm counting everything and then save into storage - I don't know if caching works the same in RDD like in Datasets

like image 682
T. Gawęda Avatar asked Jan 17 '18 15:01

T. Gawęda


People also ask

Is Unpersist an action in Spark?

Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk.

What is persist and Unpersist in Spark?

The persist() function stores the data into the memory. Spark Unpersist() marks Dataframe or Dataset as non-persistent, and it removes all the blocks for it from the memory and disk. Spark automatically monitors every persist() and Cache () call made by the user and further checks usage on each node.

What is Unpersist?

(transitive, computing) To remove from permanent storage; to make temporary again.


2 Answers

This is an expected behavior from spark caching. Spark doesn't want to keep invalid cache data. It completely removes all the cached plans refer to the dataset.

This is to make sure the query is correct. In the example you are creating extension dataset from cached dataset data. Now if the dataset data is unpersisted essentially extension dataset can no longer rely on the cached dataset data.

Here is the Pull request for the fix they made. You can see similar JIRA ticket

like image 132
Avishek Bhattacharya Avatar answered Sep 22 '22 22:09

Avishek Bhattacharya


Answer for Spark 2.4:

There was a ticket about correctness in Datasets and caching behaviour, see https://issues.apache.org/jira/browse/SPARK-24596

From Maryann Xue description, now caching will work in following manner:

  1. Drop tables and regular (persistent) views: regular mode
  2. Drop temporary views: non-cascading mode
  3. Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
  4. Call DataSet.unpersist(): non-cascading mode
  5. Call Catalog.uncacheTable(): follow the same convention as drop tables/view, which is, use non-cascading mode for temporary views and regular mode for the rest

Where "regular mode" means mdoe from the questions and @Avishek's answer and non-cascading mode means, that extension won't be unpersisted

like image 34
T. Gawęda Avatar answered Sep 20 '22 22:09

T. Gawęda