Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Difference between df.repartition and DataFrameWriter partitionBy?

What is the difference between DataFrame repartition() and DataFrameWriter partitionBy() methods?

I hope both are used to "partition data based on dataframe column"? Or is there any difference?

like image 415
Shankar Avatar asked Nov 04 '16 06:11

Shankar


People also ask

What is the difference between repartition and partitionBy in spark?

repartition() is used for specifying the number of partitions considering the number of cores and the amount of data you have. partitionBy() is most importantly used for making shuffling functions more efficient, such as reduceByKey(), join(), cogroup() etc..

What is the difference between partition by and repartition?

repartition() creates a specified number of partitions in memory. The partitionBy () will write files to disk for each memory partition and partition column.

What does partitionBy do in PySpark?

PySpark partitionBy() is used to partition based on column values while writing DataFrame to Disk/File system. When you write DataFrame to Disk by calling partitionBy() Pyspark splits the records based on the partition column and stores each partition data into a sub-directory.

What is difference between repartition and coalesce?

Spark repartition() vs coalesce() – repartition() is used to increase or decrease the RDD, DataFrame, Dataset partitions whereas the coalesce() is used to only decrease the number of partitions in an efficient way.


2 Answers

Watch out: I believe the accepted answer is not quite right! I'm glad you ask this question, because the behavior of these similarly-named functions differs in important and unexpected ways that are not well documented in the official spark documentation.

The first part of the accepted answer is correct: calling df.repartition(COL, numPartitions=k) will create a dataframe with k partitions using a hash-based partitioner. COL here defines the partitioning key--it can be a single column or a list of columns. The hash-based partitioner takes each input row's partition key, hashes it into a space of k partitions via something like partition = hash(partitionKey) % k. This guarantees that all rows with the same partition key end up in the same partition. However, rows from multiple partition keys can also end up in the same partition (when a hash collision between the partition keys occurs) and some partitions might be empty.

In summary, the unintuitive aspects of df.repartition(COL, numPartitions=k) are that

  • partitions will not strictly segregate partition keys
  • some of your k partitions may be empty, whereas others may contain rows from multiple partition keys

The behavior of df.write.partitionBy is quite different, in a way that many users won't expect. Let's say that you want your output files to be date-partitioned, and your data spans over 7 days. Let's also assume that df has 10 partitions to begin with. When you run df.write.partitionBy('day'), how many output files should you expect? The answer is 'it depends'. If each partition of your starting partitions in df contains data from each day, then the answer is 70. If each of your starting partitions in df contains data from exactly one day, then the answer is 10.

How can we explain this behavior? When you run df.write, each of the original partitions in df is written independently. That is, each of your original 10 partitions is sub-partitioned separately on the 'day' column, and a separate file is written for each sub-partition.

I find this behavior rather annoying and wish there were a way to do a global repartitioning when writing dataframes.

like image 132
conradlee Avatar answered Sep 24 '22 18:09

conradlee


If you run repartition(COL) you change the partitioning during calculations - you will get spark.sql.shuffle.partitions (default: 200) partitions. If you then call .write you will get one directory with many files.

If you run .write.partitionBy(COL) then as the result you will get as many directories as unique values in COL. This speeds up futher data reading (if you filter by partitioning column) and saves some space on storage (partitioning column is removed from data files).

UPDATE: See @conradlee's answer. He explains in details not only how the directories structure will look like after applying different methods but also what will be resulting number of files in both scenarios.

like image 33
Mariusz Avatar answered Sep 24 '22 18:09

Mariusz