Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Spark Query Plan shows more partitions whenever cache (persist) is used

Given this PySpark code on a single worker Spark cluster with 2 cores:

df = spark.table('table')

df = df.dropDuplicates(['checksum'])

#
df = df.cache()

...

df.write.save('...)

it generates and executes a plan with 200 partitions when cache is present and only 2 partitions when df.cache() is not present.

I am particularly interested to know the impact cache has on the planning in this case.

With cache: with df.cache

Without cache: without cache

A similar impact df.cache seems to have on AQE(Adaptive Query Execution), coalescing post shuffle partitions doesn't seem to occur if the DataFrame is cached after an expensive shuffle.

like image 683
unvadim Avatar asked Oct 22 '25 00:10

unvadim


1 Answers

Thats a really nice question!

I digged in, and i can confirm that by default Spark with AQE behaves like you described. Relation which is cached is not optimized by AQE.

This was addressed in this ticket: https://issues.apache.org/jira/browse/SPARK-35332

Outcome of this ticket is that this was changed and now we can set spark.sql.optimizer.canChangeCachedPlanOutputPartitioning to true to allow AQE to change paritioning during caching

I did a test on Databricks cluster with Spark 3.2

Sample code:

import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", false)

val data = Seq(("1", "Frankfurt am main", "Germany"),("1", "Frankfurt am main", "Germany"))
val df = data.toDF("Id", "City", "Country")
val uniqueRecords = df.dropDuplicates("City").cache()
uniqueRecords.show()

With default setting i had this stages:

enter image description here

Physical plan:

== Physical Plan ==
CollectLimit (9)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- SortAggregate (8)
               +- Sort (7)
                  +- Exchange (6)
                     +- SortAggregate (5)
                        +- * Sort (4)
                           +- * LocalTableScan (3)

As you can see, no AQE at all

Now lets try with

spark.conf.set("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", true)

Stages:

enter image description here

And physical plan:

== Physical Plan ==
CollectLimit (10)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- AdaptiveSparkPlan (9)
               +- SortAggregate (8)
                  +- Sort (7)
                     +- Exchange (6)
                        +- SortAggregate (5)
                           +- Sort (4)
                              +- LocalTableScan (3)

So i can cofirm that it works when you change this parameter on Spark 3.2 and above. AQE is visible in plan and partitions are coalesced

like image 58
M_S Avatar answered Oct 27 '25 06:10

M_S



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!