I just wrote a toy class to test Spark dataframe (actually Dataset since I'm using Java).
Dataset<Row> ds = spark.sql("select id,name,gender from test2.dummy where dt='2018-12-12'");
ds = ds.withColumn("dt", lit("2018-12-17"));
ds.cache();
ds.write().mode(SaveMode.Append).insertInto("test2.dummy");
//
System.out.println(ds.count());
According to my understanding, there're 2 actions, "insertInto" and "count".
I debug the code step by step, when running "insertInto", I see several lines of:
19/01/21 20:14:56 INFO FileScanRDD: Reading File path: hdfs://ip:9000/root/hive/warehouse/test2.db/dummy/dt=2018-12-12/000000_0, range: 0-451, partition values: [2018-12-12]
When running "count", I still see similar logs:
19/01/21 20:15:26 INFO FileScanRDD: Reading File path: hdfs://ip:9000/root/hive/warehouse/test2.db/dummy/dt=2018-12-12/000000_0, range: 0-451, partition values: [2018-12-12]
I have 2 questions:
1) When there're 2 actions on same dataframe like above, if I don't call ds.cache or ds.persist explicitly, will the 2nd action always causes the re-executing of the sql query?
2) If I understand the log correctly, both actions trigger hdfs file reading, does that mean the ds.cache() actually doesn't work here? If so, why it doesn't work here?
Many thanks.
cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster's workers.
Caching methods in SparkDISK_ONLY: Persist data on disk only in serialized format. MEMORY_ONLY: Persist data in memory only in deserialized format. MEMORY_AND_DISK: Persist data in memory and if enough memory is not available evicted blocks will be stored on disk. OFF_HEAP: Data is persisted in off-heap memory.
Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of a Spark DataFrame so they can be reused in subsequent actions. When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset.
You can call getStorageLevel. useMemory on the Dataframe and the RDD to find out if the dataset is in memory.
It's because you append into the table where ds
is created from, so ds
needs to be recomputed because the underlying data changed. In such cases, spark invalidates the cache. If you read e.g. this Jira (https://issues.apache.org/jira/browse/SPARK-24596):
When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed.
Try to run the ds.count
before inserting into the table.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With