So I have just 1 parquet file I'm reading with Spark (using the SQL stuff) and I'd like it to be processed with 100 partitions. I've tried setting spark.default.parallelism
to 100, we have also tried changing the compression of the parquet to none (from gzip). No matter what we do the first stage of the spark job only has a single partition (once a shuffle occurs it gets repartitioned into 100 and thereafter obviously things are much much faster).
Now according to a few sources (like below) parquet should be splittable (even if using gzip!), so I'm super confused and would love some advice.
https://www.safaribooksonline.com/library/view/hadoop-application-architectures/9781491910313/ch01.html
I'm using spark 1.0.0, and apparently the default value for spark.sql.shuffle.partitions
is 200, so it can't be that. In fact all the defaults for parallelism are much more than 1, so I don't understand what's going on.
An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.
yes you can use the re-partition method to reduce the number of tasks such that it is in balance with available resources. you also need to define the number of executor per node, no. of nodes and memory per node while submitting the app so that the tasks will execute in parallel and utilise maximum resources.
How to increase the number of partitions. If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
On the HDFS cluster, by default, Spark creates one Partition for each block of the file.
You should write your parquet files with a smaller block size. Default is 128Mb per block, but it's configurable by setting parquet.block.size
configuration in the writer.
The source of ParquetOuputFormat is here, if you want to dig into details.
The block size is minimum amount of data you can read out of a parquet file which is logically readable (since parquet is columnar, you can't just split by line or something trivial like this), so you can't have more reading threads than input blocks.
The new way of doing it (Spark 2.x) is setting
spark.sql.files.maxPartitionBytes
Source: https://issues.apache.org/jira/browse/SPARK-17998 (the official documentation is not correct yet, misses the .sql)
From my experience, Hadoop settings no longer have effect.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With