I'm loading parquet data into a dataframe via
spark.read.parquet('hdfs:///path/goes/here/...')
There are around 50k files in that path due to parquet partitioning. When I run that command, spark spawns off dozens of small jobs that as a whole take several minutes to complete. Here's what the jobs look like in the spark UI:
As you can see, although each job has ~2100 tasks, they execute quickly, in about 2 seconds. Starting so many 'mini jobs' is inefficient and leads this file listing step to take about 10 minutes (where the clusters resources are mostly idle, and the cluster is mostly dealing with straggling tasks or the overhead of managing jobs/tasks).
How can I consolidate these tasks into fewer jobs, each with more tasks? Bonus points for a solution that also works in pyspark.
I'm running spark 2.2.1 via pyspark on hadoop 2.8.3.
Coalesce hints allows the Spark SQL users to control the number of output files just like the coalesce , repartition and repartitionByRange in Dataset API, they can be used for performance tuning and reducing the number of output files. The “COALESCE” hint only has a partition number as a parameter.
Persisting & Caching data in memory Spark persisting/caching is one of the best techniques to improve the performance of the Spark workloads. Spark Cache and Persist are optimization techniques in DataFrame / Dataset for iterative and interactive Spark applications to improve the performance of Jobs.
I believe you encountered a bug for which a former colleague of mine has filed a ticket and opened a pull request. You can check it out here. If it fits your issue, your best shot is probably voting the issue up and making some noise on the mailing list about it.
What you might want to do is tweaking the spark.sql.sources.parallelPartitionDiscovery.threshold
and spark.sql.sources.parallelPartitionDiscovery.parallelism
configuration parameters (with the former being cited in the linked ticket) in a way that suits your job.
You can have a look here and here to see how the configuration key is used. I'll share the related snippets here for completeness.
spark.sql.sources.parallelPartitionDiscovery.threshold
// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
return paths.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
}
}
spark.sql.sources.parallelPartitionDiscovery.parallelism
// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
The default value for this configuration are 32
for the threshold and 10000
for the parallelism (related code here).
In your case, I'd say that probably what you want to do is setting the threshold so that the process is run without spawning parallel jobs.
The linked sources are from the latest available tagged release at the time of writing, 2.3.0.
Against an object store, even the listing and calls to getFileStatus are pretty expensive, and as this is done during partitioning, can extend the job a lot.
Play with mapreduce.input.fileinputformat.list-status.num-threads
to see if adding more threads speeds things up, say a value of 20-30
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With