Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark: impact of repartitioning, sorting and caching on a join

I am exploring Spark's behavior when joining a table to itself. I am using Databricks.

My dummy scenario is:

  1. Read an external table as dataframe A (underlying files are in delta format)

  2. Define dataframe B as dataframe A with only certain columns selected

  3. Join dataframes A and B on column1 and column2

(Yes, it doesn't make much sense, I'm just experimenting to understand Spark's underlying mechanics)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])

My first attempt was to run the code as it is (attempt 1). I then tried to repartition and cache (attempt 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()

Finally, I repartitioned, sorted and cached

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

The respective dags generated are as attached.

My questions are:

  1. Why in attempt 1 the table appears to be cached even though caching has not been explicitly specified.

  2. Why InMemoreTableScan is always followed by another node of this type.

  3. Why in attempt 3 caching appears to take place on two stages?

  4. Why in attempt 3 WholeStageCodegen follows one (and only one) InMemoreTableScan.

attempt 1

attempt 2

enter image description here

like image 367
Dawid Avatar asked Jan 03 '20 10:01

Dawid


People also ask

How does caching help in Spark?

By caching you create a checkpoint in your spark application and if further down the execution of application any of the tasks fail your application will be able to recompute the lost RDD partition from the cache.

What happens when cache memory is full in Spark?

unpersist() . If the caching layer becomes full, Spark will start evicting the data from memory using the LRU (least recently used) strategy. So it is good practice to use unpersist to stay more in control about what should be evicted.

How can you Minimise the data shuffle in JOIN IN Spark?

If you have to do an operation before the join that requires a shuffle, such as aggregateByKey or reduceByKey , you can prevent the shuffle by adding a hash partitioner with the same number of partitions as an explicit argument to the first operation before the join.


1 Answers

What you are observing in these 3 plans is a mixture of DataBricks runtime and Spark.

First of all, while running Databricks runtime 3.3+, caching is automatically enabled for all parquet files. Corresponding config for that: spark.databricks.io.cache.enabled true

For your second query, InMemoryTableScan is happening twice because right when join was called, spark tried to compute Dataset A and Dataset B in parallel. Assuming different executors got assigned the above tasks, both will have to scan the table from (Databricks) cache.

For the third one, InMemoryTableScan does not refer to caching in itself. It just means that whatever plan catalyst formed involved scanning the cached table multiple times.

PS: I can't visualize the point 4 :)

like image 167
Ashvjit Singh Avatar answered Sep 28 '22 06:09

Ashvjit Singh