Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Un-persisting all dataframes in (py)spark

I am a spark application with several points where I would like to persist the current state. This is usually after a large step, or caching a state that I would like to use multiple times. It appears that when I call cache on my dataframe a second time, a new copy is cached to memory. In my application, this leads to memory issues when scaling up. Even though, a given dataframe is a maximum of about 100 MB in my current tests, the cumulative size of the intermediate results grows beyond the alloted memory on the executor. See below for a small example that shows this behavior.

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)

df = (hive_context.read
      .format('com.databricks.spark.csv')
      .load('simple_data.csv')
     )
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()

simple_data.csv:

1,2,3
4,5,6
7,8,9

Looking at the application UI, there is a copy of the original dataframe, in adition to the one with the new column. I can remove the original copy by calling df.unpersist() before the withColumn line. Is this the recommended way to remove cached intermediate result (i.e. call unpersist before every cache()).

Also, is it possible to purge all cached objects. In my application, there are natural breakpoints where I can simply purge all memory, and move on to the next file. I would like to do this without creating a new spark application for each input file.

Thank you in advance!

like image 975
bjack3 Avatar asked Apr 28 '16 05:04

bjack3


People also ask

How do you Unpersist all DataFrames in Spark?

Looking at the application UI, there is a copy of the original dataframe, in adition to the one with the new column. I can remove the original copy by calling df. unpersist() before the withColumn line. Is this the recommended way to remove cached intermediate result (i.e. call unpersist before every cache() ).

What does Unpersist do in PySpark?

Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk.

How do I Uncache a DataFrame in PySpark?

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD. unpersist() method.

How do you persist DataFrame in PySpark?

Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of a Spark DataFrame so they can be reused in subsequent actions. When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset.


3 Answers

Spark 2.x

You can use Catalog.clearCache:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()

Spark 1.x

You can use SQLContext.clearCache method which

Removes all cached tables from the in-memory cache.

from pyspark.sql import SQLContext
from pyspark import SparkContext

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()
like image 87
zero323 Avatar answered Oct 16 '22 22:10

zero323


We use this quite often

for (id, rdd) in sc._jsc.getPersistentRDDs().items():
    rdd.unpersist()
    print("Unpersisted {} rdd".format(id))

where sc is a sparkContext variable.

like image 24
Tagar Avatar answered Oct 17 '22 00:10

Tagar


When you use cache on dataframe it is one of the transformation and gets evaluated lazily when you perform any action on it like count(),show() etc.

In your case after doing first cache you are calling show() that is the reason the dataframe is cached in memory. Now then you are again performing transformation on the dataframe to add additional column and again caching the new dataframe and then calling the action command show again and this would cache the second dataframe in memory. In case if size of your dataframe is big enough to just hold one dataframe then when you cache the second dataframe it would remove the first dataframe from the memory as it does not have enough space to hold the second dataframe.

Thing to keep in mind: You should not cache a dataframe unless you are using it in multiple actions otherwise it would be an overload in terms of performance as caching itself is costlier operation.

like image 1
Nikunj Kakadiya Avatar answered Oct 16 '22 22:10

Nikunj Kakadiya