I am a spark application with several points where I would like to persist the current state. This is usually after a large step, or caching a state that I would like to use multiple times. It appears that when I call cache on my dataframe a second time, a new copy is cached to memory. In my application, this leads to memory issues when scaling up. Even though, a given dataframe is a maximum of about 100 MB in my current tests, the cumulative size of the intermediate results grows beyond the alloted memory on the executor. See below for a small example that shows this behavior.
cache_test.py:
from pyspark import SparkContext, HiveContext
spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)
df = (hive_context.read
.format('com.databricks.spark.csv')
.load('simple_data.csv')
)
df.cache()
df.show()
df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()
spark_context.stop()
simple_data.csv:
1,2,3
4,5,6
7,8,9
Looking at the application UI, there is a copy of the original dataframe, in adition to the one with the new column. I can remove the original copy by calling df.unpersist()
before the withColumn line. Is this the recommended way to remove cached intermediate result (i.e. call unpersist before every cache()
).
Also, is it possible to purge all cached objects. In my application, there are natural breakpoints where I can simply purge all memory, and move on to the next file. I would like to do this without creating a new spark application for each input file.
Thank you in advance!
Looking at the application UI, there is a copy of the original dataframe, in adition to the one with the new column. I can remove the original copy by calling df. unpersist() before the withColumn line. Is this the recommended way to remove cached intermediate result (i.e. call unpersist before every cache() ).
Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk.
Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD. unpersist() method.
Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of a Spark DataFrame so they can be reused in subsequent actions. When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset.
Spark 2.x
You can use Catalog.clearCache
:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()
Spark 1.x
You can use SQLContext.clearCache
method which
Removes all cached tables from the in-memory cache.
from pyspark.sql import SQLContext
from pyspark import SparkContext
sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()
We use this quite often
for (id, rdd) in sc._jsc.getPersistentRDDs().items():
rdd.unpersist()
print("Unpersisted {} rdd".format(id))
where sc
is a sparkContext variable.
When you use cache on dataframe it is one of the transformation and gets evaluated lazily when you perform any action on it like count(),show() etc.
In your case after doing first cache you are calling show() that is the reason the dataframe is cached in memory. Now then you are again performing transformation on the dataframe to add additional column and again caching the new dataframe and then calling the action command show again and this would cache the second dataframe in memory. In case if size of your dataframe is big enough to just hold one dataframe then when you cache the second dataframe it would remove the first dataframe from the memory as it does not have enough space to hold the second dataframe.
Thing to keep in mind: You should not cache a dataframe unless you are using it in multiple actions otherwise it would be an overload in terms of performance as caching itself is costlier operation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With