I am exploring Spark's behavior when joining a table to itself. I am using Databricks.
My dummy scenario is:
Read an external table as dataframe A (underlying files are in delta format)
Define dataframe B as dataframe A with only certain columns selected
Join dataframes A and B on column1 and column2
(Yes, it doesn't make much sense, I'm just experimenting to understand Spark's underlying mechanics)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
My first attempt was to run the code as it is (attempt 1). I then tried to repartition and cache (attempt 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Finally, I repartitioned, sorted and cached
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
The respective dags generated are as attached.
My questions are:
Why in attempt 1 the table appears to be cached even though caching has not been explicitly specified.
Why InMemoreTableScan is always followed by another node of this type.
Why in attempt 3 caching appears to take place on two stages?
Why in attempt 3 WholeStageCodegen follows one (and only one) InMemoreTableScan.
By caching you create a checkpoint in your spark application and if further down the execution of application any of the tasks fail your application will be able to recompute the lost RDD partition from the cache.
unpersist() . If the caching layer becomes full, Spark will start evicting the data from memory using the LRU (least recently used) strategy. So it is good practice to use unpersist to stay more in control about what should be evicted.
If you have to do an operation before the join that requires a shuffle, such as aggregateByKey or reduceByKey , you can prevent the shuffle by adding a hash partitioner with the same number of partitions as an explicit argument to the first operation before the join.
What you are observing in these 3 plans is a mixture of DataBricks runtime and Spark.
First of all, while running Databricks runtime 3.3+, caching is automatically enabled for all parquet files.
Corresponding config for that:
spark.databricks.io.cache.enabled true
For your second query, InMemoryTableScan is happening twice because right when join was called, spark tried to compute Dataset A and Dataset B in parallel. Assuming different executors got assigned the above tasks, both will have to scan the table from (Databricks) cache.
For the third one, InMemoryTableScan does not refer to caching in itself. It just means that whatever plan catalyst formed involved scanning the cached table multiple times.
PS: I can't visualize the point 4 :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With