Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Explicit caching can interfere with Catalyst optimizer's ability to optimize some queries?

I'm studying to take the data bricks to spark certification exam, and their practice exam ( please see > https://databricks-prod-cloudfront.cloud.databricks.com/public/793177bc53e528530b06c78a4fa0e086/0/6221173/100020/latest.html ) requires us to accept this statement as true fact:

"Explicit caching can decrease application performance by interfering with the Catalyst optimizer's ability to optimize some queries"

I got this question wrong even though I have read up a lot on the catalyst and have a pretty good grasp of the details. So I wanted to shore up my knowledge of this topic and go to the source which explains the how's and why's behind this assertion.

Can anyone provide guidance about this? Specifically, why is this so? and how do we ensure that when we cache our datasets we are not actually getting in the way of the optimizer and making things worse? /Thanks!

like image 420
Chris Bedford Avatar asked Jan 25 '23 23:01

Chris Bedford


1 Answers

How and why can a cache decrease the performances ?

Let's use a simple example to demonstrate that :

// Some data
val df = spark.range(100)

df.join(df, Seq("id")).filter('id <20).explain(true)

Here, the catalyst plan will optimize this join by doing a filter on each dataframe before joining, to reduce the amount of data that will get shuffled.

== Optimized Logical Plan ==
Project [id#0L]
+- Join Inner, (id#0L = id#69L)
   :- Filter (id#0L < 20)
   :  +- Range (0, 100, step=1, splits=Some(4))
   +- Filter (id#69L < 20)
      +- Range (0, 100, step=1, splits=Some(4))

If we cache the query after the join, the query won't be as optimized, as we can see here :

df.join(df, Seq("id")).cache.filter('id <20).explain(true)

== Optimized Logical Plan ==
Filter (id#0L < 20)
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *Project [id#0L]
         +- *BroadcastHashJoin [id#0L], [id#74L], Inner, BuildRight
            :- *Range (0, 100, step=1, splits=4)
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
               +- *Range (0, 100, step=1, splits=4)

The filter is done at the very end ...

Why so ? Because a cache writes on the disk the dataframe. So every consequent queries will use this cached / written on disk DataFrame, and so it will optimize only the part of the query AFTER the cache. We can check that with the same example !

df.join(df, Seq("id")).cache.join(df, Seq("id")).filter('id <20).explain(true)

== Optimized Logical Plan ==
Project [id#0L]
+- Join Inner, (id#0L = id#92L)
   :- Filter (id#0L < 20)
   :  +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   :        +- *Project [id#0L]
   :           +- *BroadcastHashJoin [id#0L], [id#74L], Inner, BuildRight
   :              :- *Range (0, 100, step=1, splits=4)
   :              +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   :                 +- *Range (0, 100, step=1, splits=4)
   +- Filter (id#92L < 20)
      +- Range (0, 100, step=1, splits=Some(4))

The filter is done before the second join, but after the first one because it is cached.

How to avoid ?

By knowing what you do ! You can simply compares catalyst plans and see what optimizations Spark is missing.

like image 88
BlueSheepToken Avatar answered Jan 29 '23 08:01

BlueSheepToken