Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Should cache and checkpoint be used together on DataSets? If so, how does this work under the hood?

I am working on a Spark ML pipeline where we get OOM Errors on larger data sets. Before training we were using cache(); I swapped this out for checkpoint() and our memory requirements went down significantly. However, in the docs for RDD's checkpoint() it says:

It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

The same guidance is not given for DataSet's checkpoint, which is what I am using. Following the above advice anyways, I found that the memory requirements actually increased slightly from using cache() alone.

My expectation was that when we do

...
ds.cache()
ds.checkpoint()
...

the call to checkpoint forces evaluation of the DataSet, which is cached at the same time before being checkpointed. Afterwards, any reference to ds would reference the cached partitions, and if more memory is required and the partitions are evacuated that the checkpointed partitions will be used rather than re-evaluating them. Is this true, or does something different happen under the hood? Ideally I'd like to keep the DataSet in memory if possible, but it seems there is no benefit whatsoever from a memory standpoint to using the cache and checkpoint approach.

like image 311
oirectine Avatar asked Jun 21 '19 22:06

oirectine


People also ask

Which of the following is are the correct ways to cache the data tables in Spark SQL?

You should use sqlContext. cacheTable("table_name") in order to cache it, or alternatively use CACHE TABLE table_name SQL query.

When should I cache my Spark data frame?

When to cache? If you're executing multiple actions on the same DataFrame then cache it. Every time the following line is executed (in this case 3 times), spark reads the Parquet file, and executes the query. Now, Spark will read the Parquet, execute the query only once and then cache it.

What are the main purposes of the checkpoints in Spark ?( Choose all the right answers?

Checkpointing is actually a feature of Spark Core (that Spark SQL uses for distributed computations) that allows a driver to be restarted on failure with previously computed state of a distributed computation described as an RDD .

What does cache () do in Pyspark?

cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster's workers.


Video Answer


1 Answers

TL;DR You won't benefit from in-memory cache (default storage level for Dataset is MEMORY_AND_DISK anyway) in subsequent actions, but you should still consider caching, if computing ds is expensive.

Explanation

Your expectation that

ds.cache()
ds.checkpoint()
...

the call to checkpoint forces evaluation of the DataSet

is correct. Dataset.checkpoint comes in different flavors, which allow for both eager and lazy checkpointing, and the default variant is eager

def checkpoint(): Dataset[T] = checkpoint(eager = true, reliableCheckpoint = true)

Therefore subsequent actions should reuse checkpoint files.

However, under the covers Spark simply applies checkpoint on the internal RDD, so the rules of evaluation didn't change. Spark evaluates action first, and then creates checkpoint (that's why caching was recommended in the first place).

So if you omit ds.cache() ds will be evaluated twice in ds.checkpoint():

  • Once for internal count.
  • Once for actual checkpoint.

Therefore nothing changed and cache is still recommended, although recommendation might might slightly weaker, compared to plain RDD, as Dataset cache is considered computationally expensive, and depending on the context, it might be cheaper to simply reload the data (note that Dataset.count without cache is normally optimized, while Dataset.count with cache is not - Any performance issues forcing eager evaluation using count in spark?).

like image 58
user10938362 Avatar answered Sep 27 '22 20:09

user10938362