Given this PySpark code on a single worker Spark cluster with 2 cores:
df = spark.table('table')
df = df.dropDuplicates(['checksum'])
#
df = df.cache()
...
df.write.save('...)
it generates and executes a plan with 200 partitions when cache is present and only 2 partitions when df.cache() is not present.
I am particularly interested to know the impact cache has on the planning in this case.
With cache:

Without cache:

A similar impact df.cache seems to have on AQE(Adaptive Query Execution), coalescing post shuffle partitions doesn't seem to occur if the DataFrame is cached after an expensive shuffle.
Thats a really nice question!
I digged in, and i can confirm that by default Spark with AQE behaves like you described. Relation which is cached is not optimized by AQE.
This was addressed in this ticket: https://issues.apache.org/jira/browse/SPARK-35332
Outcome of this ticket is that this was changed and now we can set spark.sql.optimizer.canChangeCachedPlanOutputPartitioning to true to allow AQE to change paritioning during caching
I did a test on Databricks cluster with Spark 3.2
Sample code:
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", false)
val data = Seq(("1", "Frankfurt am main", "Germany"),("1", "Frankfurt am main", "Germany"))
val df = data.toDF("Id", "City", "Country")
val uniqueRecords = df.dropDuplicates("City").cache()
uniqueRecords.show()
With default setting i had this stages:

Physical plan:
== Physical Plan ==
CollectLimit (9)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- SortAggregate (8)
+- Sort (7)
+- Exchange (6)
+- SortAggregate (5)
+- * Sort (4)
+- * LocalTableScan (3)
As you can see, no AQE at all
Now lets try with
spark.conf.set("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", true)
Stages:

And physical plan:
== Physical Plan ==
CollectLimit (10)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- AdaptiveSparkPlan (9)
+- SortAggregate (8)
+- Sort (7)
+- Exchange (6)
+- SortAggregate (5)
+- Sort (4)
+- LocalTableScan (3)
So i can cofirm that it works when you change this parameter on Spark 3.2 and above. AQE is visible in plan and partitions are coalesced
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With