Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to split parquet files into many partitions in Spark?

So I have just 1 parquet file I'm reading with Spark (using the SQL stuff) and I'd like it to be processed with 100 partitions. I've tried setting spark.default.parallelism to 100, we have also tried changing the compression of the parquet to none (from gzip). No matter what we do the first stage of the spark job only has a single partition (once a shuffle occurs it gets repartitioned into 100 and thereafter obviously things are much much faster).

Now according to a few sources (like below) parquet should be splittable (even if using gzip!), so I'm super confused and would love some advice.

https://www.safaribooksonline.com/library/view/hadoop-application-architectures/9781491910313/ch01.html

I'm using spark 1.0.0, and apparently the default value for spark.sql.shuffle.partitions is 200, so it can't be that. In fact all the defaults for parallelism are much more than 1, so I don't understand what's going on.

like image 974
samthebest Avatar asked Nov 28 '14 18:11

samthebest


People also ask

Can Parquet files be partitioned?

An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.

Can Parquet data be partitioned across multiple nodes?

yes you can use the re-partition method to reduce the number of tasks such that it is in balance with available resources. you also need to define the number of executor per node, no. of nodes and memory per node while submitting the app so that the tasks will execute in parallel and utilise maximum resources.

How do I increase the number of partitions in Spark?

How to increase the number of partitions. If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.

How many partitions we get when we create Spark DataFrame by reading Parquet file stored in HDFS location?

On the HDFS cluster, by default, Spark creates one Partition for each block of the file.


2 Answers

You should write your parquet files with a smaller block size. Default is 128Mb per block, but it's configurable by setting parquet.block.size configuration in the writer.

The source of ParquetOuputFormat is here, if you want to dig into details.

The block size is minimum amount of data you can read out of a parquet file which is logically readable (since parquet is columnar, you can't just split by line or something trivial like this), so you can't have more reading threads than input blocks.

like image 189
C4stor Avatar answered Sep 25 '22 05:09

C4stor


The new way of doing it (Spark 2.x) is setting

spark.sql.files.maxPartitionBytes

Source: https://issues.apache.org/jira/browse/SPARK-17998 (the official documentation is not correct yet, misses the .sql)

From my experience, Hadoop settings no longer have effect.

like image 30
F Pereira Avatar answered Sep 22 '22 05:09

F Pereira