Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why Spark dataframe cache doesn't work here

I just wrote a toy class to test Spark dataframe (actually Dataset since I'm using Java).

Dataset<Row> ds = spark.sql("select id,name,gender from test2.dummy where dt='2018-12-12'");
ds = ds.withColumn("dt", lit("2018-12-17"));
ds.cache();
ds.write().mode(SaveMode.Append).insertInto("test2.dummy");
//
System.out.println(ds.count());

According to my understanding, there're 2 actions, "insertInto" and "count".

I debug the code step by step, when running "insertInto", I see several lines of:

19/01/21 20:14:56 INFO FileScanRDD: Reading File path: hdfs://ip:9000/root/hive/warehouse/test2.db/dummy/dt=2018-12-12/000000_0, range: 0-451, partition values: [2018-12-12]

When running "count", I still see similar logs:

19/01/21 20:15:26 INFO FileScanRDD: Reading File path: hdfs://ip:9000/root/hive/warehouse/test2.db/dummy/dt=2018-12-12/000000_0, range: 0-451, partition values: [2018-12-12]

I have 2 questions:

1) When there're 2 actions on same dataframe like above, if I don't call ds.cache or ds.persist explicitly, will the 2nd action always causes the re-executing of the sql query?

2) If I understand the log correctly, both actions trigger hdfs file reading, does that mean the ds.cache() actually doesn't work here? If so, why it doesn't work here?

Many thanks.

like image 880
gfytd Avatar asked Jan 21 '19 13:01

gfytd


People also ask

Can we cache DataFrame in Spark?

cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster's workers.

How do I use cache in Spark?

Caching methods in SparkDISK_ONLY: Persist data on disk only in serialized format. MEMORY_ONLY: Persist data in memory only in deserialized format. MEMORY_AND_DISK: Persist data in memory and if enough memory is not available evicted blocks will be stored on disk. OFF_HEAP: Data is persisted in off-heap memory.

How do I cache a DataFrame in Spark SQL?

Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of a Spark DataFrame so they can be reused in subsequent actions. When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset.

How do I know if a data frame is cached?

You can call getStorageLevel. useMemory on the Dataframe and the RDD to find out if the dataset is in memory.


1 Answers

It's because you append into the table where ds is created from, so ds needs to be recomputed because the underlying data changed. In such cases, spark invalidates the cache. If you read e.g. this Jira (https://issues.apache.org/jira/browse/SPARK-24596):

When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed.

Try to run the ds.count before inserting into the table.

like image 93
Raphael Roth Avatar answered Sep 27 '22 22:09

Raphael Roth