Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is repartition faster than partitionBy in Spark?

I am attempting to use Spark for a very simple use case: given a large set of files (90k) with device time-series data for millions of devices group all of the time-series reads for a given device into a single set of files (partition). For now let’s say we are targeting 100 partitions, and it is not critical that a given devices data shows up in the same output file, just the same partition.

Given this problem we’ve come up with two ways to do this - repartition then write or write with partitionBy applied to the Writer. The code for either of these is very simple:

repartition (hash column is added to ensure that comparison to partitionBy code below is one-to-one):


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .repartition("partition") \
  .write.format("json") \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

partitionBy:


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .write.format("json") \
  .partitionBy(“partition”) \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

In our testing repartition is 10x faster than partitionBy. Why is this?

Based on my understanding repartition will incur a shuffle which my Spark learnings have told me to try to avoid whenever possible. On the other hand, partitionBy (based on my understanding) only incurs an sort operation local to each node - no shuffle is needed. Am I misunderstanding something that is causing me to think partitionBy would be faster?

like image 418
Robin Lashof-Regas Avatar asked Nov 15 '21 06:11

Robin Lashof-Regas


People also ask

Is repartition faster than coalesce?

Repartition always involves a shuffle. Repartition works by creating new partitions and doing a full shuffle to move data around. Results in more or less equal sized partitions. Since a full shuffle takes place, repartition is less performant than coalesce.

What is difference between repartition and partitionBy in Spark?

repartition() is used to partition data in memory and partitionBy is used to partition data on disk.

Why do we need repartition in Spark?

The repartition function allows us to change the distribution of the data on the Spark cluster. This distribution change will induce shuffle (physical data movement) under the hood, which is quite an expensive operation.

How is repartition is different from coalesce?

Differences between coalesce and repartition The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.


Video Answer


2 Answers

TLDR: Spark triggers a sort when you call partitionBy, and not a hash re-partitioning. This is why it is much slower in your case.

We can check that with a toy example:

spark.range(1000).withColumn("partition", 'id % 100)
    .repartition('partition).write.csv("/tmp/test.csv")

DAG 1

Don't pay attention to the grey stage, it is skipped because it was computed in a previous job.

Then, with partitionBy:

spark.range(1000).withColumn("partition", 'id % 100)
    .write.partitionBy("partition").csv("/tmp/test2.csv")

DAG2

You can check that you can add repartition before partitionBy, the sort will still be there. So what's happening? Notice that the sort in the second DAG does not trigger a shuffle. It is a map partition. In fact, when you call partitionBy, spark does not shuffle the data as one would expect at first. Spark sorts each partition individually and then each executor writes his data in the according partition, in a separate file. Therefore, note that with partitionBy you are not writing num_partitions files but something between num_partitions and num_partitions * num_executors files. Each partition has one file per executor containing data belonging to that partition.

like image 63
Oli Avatar answered Dec 04 '22 12:12

Oli


I think @Oli has explained the issue perfectly in his comments to the main answer. I just want to add my 2 cents and try to explain the same.

Let's say when you are reading the XML files [90K files], spark reads it into N partitions. This is decided based on the number of factors like spark.sql.files.maxPartitionBytes, file format, compression type etc.

Let's assume it to be 10K partitions. This is happening in the below part of the code.

df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \

Assuming you are using num_partitions = 100, you are adding a new column called partition with values 0-99. Spark is just adding a new column to the existing dataframe [or rdd] which is split across the 10K partitions.

.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \

Till this point, both the codes are the same.

Now, let's compare what is happening with repartition v/s partitionBy

Case 1: repartition

.repartition("partition") \
.write.format("json") \

Here, you are repartitioning the existing dataframe based on the column "partition" which has 100 distinct values. So the existing dataframe will incur a full shuffle bringing down the number of partitions from 10K to 100. This stage will be compute-heavy since a full shuffle is involved. This could also fail if the size of one particular partition is really huge [skewed partition].

But the advantage here is that in the next stage where write happens, Spark has to write only 100 files to the output_path. Each file will only have data corresponding to only one value of column "partition"

Case 2: partitionBy

.write.format("json") \
.partitionBy("partition") \

Here, you are asking spark to write the existing dataframe into output_path partitioned by the distinct values of the column "partition". You are nowhere asking spark to reduce the existing partition count of the dataframe.

So spark will create new folders inside the output_path and write data corresponding to each partitions inside it.

output_path + "\partition=0\"
output_path + "\partition=1\"
output_path + "\partition=99\"

Now since you have 10K spark partitions on the existing data frame and assuming the worst case where each of these 10K partitions has all the distinct values of the column "partition", Spark will have to write 10K * 100 = 1M files. ie, some part of all the 10K partitions will be written to all of the 100 folders created by the column "partition". This way spark will be writing 1M files to the output_path by creating sub-directories inside. The advantage is that we are skipping a full-shuffle using this method.

Now compared to the in-memory compute-intensive shuffle in Case 1, this will be much slower since Spark has to create 1M files and write them to persistent storage. That too, initially to a temporary folder and then to the output_path.

This will be much more slower if the write is happening to an object-store like AWS S3 or GCP Blob

Case 3: coalesce + partitionBy

.coalesce(num_partitions) \
.write.format("json") \
.partitionBy("partition") \

In this case, you will be reducing the number of spark partitions from 10K to 100 with coalesce() and writing it to output_path partitioned by column "partition".

So, assuming the worst case where each of these 100 partitions has all the distinct values of the column "partition", spark will have to write 100 * 100 = 10K files.

This will still be faster than Case 2, but will be slower than Case 1. This is because you are doing a partial shuffle with coalesce() but still end up writing 10K files to output_path.

Case 4: repartition+ partitionBy

.repartition("partition") \
.write.format("json") \
.partitionBy("partition") \

In this case, you will be reducing the number of spark partitions from 10K to 100 [distinct values of column "partition"] with repartition() and writing it to output_path partitioned by column "partition".

So, each of these 100 partitions has only one distinct value of the column "partition", spark will have to write 100 * 1 = 100 files. Each sub-folder created by partitionBy() will only have 1 file inside it.

This will take the same time as Case 1 since both the cases involve a full-shuffle and then writing 100 files. The only difference here will be that 100 files will be inside sub-folders under the output_path.

This setup will be useful for predicate push-down of filters while reading the output_path via spark or hive.

Conclusion:

Even though partitionBy is faster than repartition, depending on the number of dataframe partitions and distribution of data inside those partitions, just using partitionBy alone might end up costly.

like image 33
Rinaz Belhaj Avatar answered Dec 04 '22 12:12

Rinaz Belhaj