Logo Questions Linux Laravel Mysql Ubuntu Git Menu

DataFrame partitionBy to a single Parquet file (per partition)

I would like to repartition / coalesce my data so that it is saved into one Parquet file per partition. I would also like to use the Spark SQL partitionBy API. So I could do that like this:

df.coalesce(1)     .write     .partitionBy("entity", "year", "month", "day", "status")     .mode(SaveMode.Append)     .parquet(s"$location") 

I've tested this and it doesn't seem to perform well. This is because there is only one partition to work on in the dataset and all the partitioning, compression and saving of files has to be done by one CPU core.

I could rewrite this to do the partitioning manually (using filter with the distinct partition values for example) before calling coalesce.

But is there a better way to do this using the standard Spark SQL API?

like image 256
Patrick McGloin Avatar asked Jan 14 '16 12:01

Patrick McGloin

People also ask

Can Parquet file be partitioned?

An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.

How many partitions we get when we create spark DataFrame by reading Parquet file stored in HDFS location?

By default, DataFrame shuffle operations create 200 partitions.

What is the advantage of storing data frames in Parquet files in Sparksql?

Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.

1 Answers

I had the exact same problem and I found a way to do this using DataFrame.repartition(). The problem with using coalesce(1) is that your parallelism drops to 1, and it can be slow at best and error out at worst. Increasing that number doesn't help either -- if you do coalesce(10) you get more parallelism, but end up with 10 files per partition.

To get one file per partition without using coalesce(), use repartition() with the same columns you want the output to be partitioned by. So in your case, do this:

import spark.implicits._ df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location") 

Once I do that I get one parquet file per output partition, instead of multiple files.

I tested this in Python, but I assume in Scala it should be the same.

like image 144
mortada Avatar answered Oct 06 '22 00:10
