I am working on a Spark ML pipeline where we get OOM Errors on larger data sets. Before training we were using cache()
; I swapped this out for checkpoint()
and our memory requirements went down significantly. However, in the docs for RDD
's checkpoint()
it says:
It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
The same guidance is not given for DataSet
's checkpoint, which is what I am using. Following the above advice anyways, I found that the memory requirements actually increased slightly from using cache()
alone.
My expectation was that when we do
...
ds.cache()
ds.checkpoint()
...
the call to checkpoint forces evaluation of the DataSet
, which is cached at the same time before being checkpointed. Afterwards, any reference to ds
would reference the cached partitions, and if more memory is required and the partitions are evacuated that the checkpointed partitions will be used rather than re-evaluating them. Is this true, or does something different happen under the hood? Ideally I'd like to keep the DataSet in memory if possible, but it seems there is no benefit whatsoever from a memory standpoint to using the cache and checkpoint approach.
You should use sqlContext. cacheTable("table_name") in order to cache it, or alternatively use CACHE TABLE table_name SQL query.
When to cache? If you're executing multiple actions on the same DataFrame then cache it. Every time the following line is executed (in this case 3 times), spark reads the Parquet file, and executes the query. Now, Spark will read the Parquet, execute the query only once and then cache it.
Checkpointing is actually a feature of Spark Core (that Spark SQL uses for distributed computations) that allows a driver to be restarted on failure with previously computed state of a distributed computation described as an RDD .
cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster's workers.
TL;DR You won't benefit from in-memory cache (default storage level for Dataset
is MEMORY_AND_DISK
anyway) in subsequent actions, but you should still consider caching, if computing ds
is expensive.
Explanation
Your expectation that
ds.cache() ds.checkpoint() ...
the call to checkpoint forces evaluation of the DataSet
is correct. Dataset.checkpoint
comes in different flavors, which allow for both eager and lazy checkpointing, and the default variant is eager
def checkpoint(): Dataset[T] = checkpoint(eager = true, reliableCheckpoint = true)
Therefore subsequent actions should reuse checkpoint files.
However, under the covers Spark simply applies checkpoint
on the internal RDD
, so the rules of evaluation didn't change. Spark evaluates action first, and then creates checkpoint
(that's why caching was recommended in the first place).
So if you omit ds.cache()
ds
will be evaluated twice in ds.checkpoint()
:
count
.checkpoint
.Therefore nothing changed and cache
is still recommended, although recommendation might might slightly weaker, compared to plain RDD
, as Dataset
cache is considered computationally expensive, and depending on the context, it might be cheaper to simply reload the data (note that Dataset.count
without cache
is normally optimized, while Dataset.count
with cache
is not - Any performance issues forcing eager evaluation using count in spark?).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With