Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark parquet partitioning : Large number of files

I am trying to leverage spark partitioning. I was trying to do something like

data.write.partitionBy("key").parquet("/location") 

The issue here each partition creates huge number of parquet files which result slow read if I am trying to read from the root directory.

To avoid that I tried

data.coalese(numPart).write.partitionBy("key").parquet("/location") 

This however creates numPart number of parquet files in each partition. Now my partition size is different. SO I would ideally like to have separate coalesce per partition. This is however doesn't look like an easy thing. I need to visit all the partition coalesce to a certain number and store at a separate location.

How should I use partitioning to avoid many files after write?

like image 349
Avishek Bhattacharya Avatar asked Jun 28 '17 16:06

Avishek Bhattacharya


People also ask

Is it better to have one large Parquet file or lots of smaller Parquet files?

Also, parquet file size and for that matter all files generally should be greater in size than the HDFS block size (default 128MB). we are using coalesce function with hive context with 50 executors for one of our file which is ~15GB and it runs like a charm.

Can Parquet files be partitioned?

An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.

How big should Spark partitions be?

The ideal size of each partition is around 100-200 MB. The smaller size of partitions will increase the parallel running jobs, which can improve performance, but too small of a partition will cause overhead and increasing the GC time.


2 Answers

First I would really avoid using coalesce, as this is often pushed up further in the chain of transformation and may destroy the parallelism of your job (I asked about this issue here : Coalesce reduces parallelism of entire stage (spark))

Writing 1 file per parquet-partition is realtively easy (see Spark dataframe write method writing many small files):

data.repartition($"key").write.partitionBy("key").parquet("/location") 

If you want to set an arbitrary number of files (or files which have all the same size), you need to further repartition your data using another attribute which could be used (I cannot tell you what this might be in your case):

data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location") 

another_key could be another attribute of your dataset, or a derived attribute using some modulo or rounding-operations on existing attributes. You could even use window-functions with row_number over key and then round this by something like

data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location") 

This would put you N records into 1 parquet file

using orderBy

You can also control the number of files without repartitioning by ordering your dataframe accordingly:

data.orderBy($"key").write.partitionBy("key").parquet("/location") 

This will lead to a total of (at least, but not much more than) spark.sql.shuffle.partitions files across all partitions (by default 200). It's even beneficial to add a second ordering column after $key, as parquet will remember the ordering of the dataframe and will write the statistics accordingly. For example, you can order by an ID:

data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location") 

This will not change the number of files, but it will improve the performance when you query your parquet file for a given key and id. See e.g. https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide and https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example

Spark 2.2+

From Spark 2.2 on, you can also play with the new option maxRecordsPerFile to limit the number of records per file if you have too large files. You will still get at least N files if you have N partitions, but you can split the file written by 1 partition (task) into smaller chunks:

df.write .option("maxRecordsPerFile", 10000) ... 

See e.g. http://www.gatorsmile.io/anticipated-feature-in-spark-2-2-max-records-written-per-file/ and spark write to disk with N files less than N partitions

like image 77
Raphael Roth Avatar answered Oct 03 '22 14:10

Raphael Roth


Let's expand on Raphael Roth's answer with an additional approach that'll create an upper bound on the number of files each partition can contain, as discussed in this answer:

import org.apache.spark.sql.functions.rand  df.repartition(numPartitions, $"some_col", rand)   .write.partitionBy("some_col")   .parquet("partitioned_lake") 
like image 39
Powers Avatar answered Oct 03 '22 14:10

Powers