Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Parquet Loader: Reduce number of jobs involved in listing a dataframe's files

I'm loading parquet data into a dataframe via

spark.read.parquet('hdfs:///path/goes/here/...')

There are around 50k files in that path due to parquet partitioning. When I run that command, spark spawns off dozens of small jobs that as a whole take several minutes to complete. Here's what the jobs look like in the spark UI:

enter image description here

As you can see, although each job has ~2100 tasks, they execute quickly, in about 2 seconds. Starting so many 'mini jobs' is inefficient and leads this file listing step to take about 10 minutes (where the clusters resources are mostly idle, and the cluster is mostly dealing with straggling tasks or the overhead of managing jobs/tasks).

How can I consolidate these tasks into fewer jobs, each with more tasks? Bonus points for a solution that also works in pyspark.

I'm running spark 2.2.1 via pyspark on hadoop 2.8.3.

like image 453
conradlee Avatar asked Mar 06 '18 14:03

conradlee


People also ask

How do I control the number of output files in spark?

Coalesce hints allows the Spark SQL users to control the number of output files just like the coalesce , repartition and repartitionByRange in Dataset API, they can be used for performance tuning and reducing the number of output files. The “COALESCE” hint only has a partition number as a parameter.

How can you improve performance of your spark application?

Persisting & Caching data in memory Spark persisting/caching is one of the best techniques to improve the performance of the Spark workloads. Spark Cache and Persist are optimization techniques in DataFrame / Dataset for iterative and interactive Spark applications to improve the performance of Jobs.


2 Answers

I believe you encountered a bug for which a former colleague of mine has filed a ticket and opened a pull request. You can check it out here. If it fits your issue, your best shot is probably voting the issue up and making some noise on the mailing list about it.

What you might want to do is tweaking the spark.sql.sources.parallelPartitionDiscovery.threshold and spark.sql.sources.parallelPartitionDiscovery.parallelism configuration parameters (with the former being cited in the linked ticket) in a way that suits your job.

You can have a look here and here to see how the configuration key is used. I'll share the related snippets here for completeness.

spark.sql.sources.parallelPartitionDiscovery.threshold

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
  return paths.map { path =>
    (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
  }
}

spark.sql.sources.parallelPartitionDiscovery.parallelism

// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)

The default value for this configuration are 32 for the threshold and 10000 for the parallelism (related code here).


In your case, I'd say that probably what you want to do is setting the threshold so that the process is run without spawning parallel jobs.

Note

The linked sources are from the latest available tagged release at the time of writing, 2.3.0.

like image 80
stefanobaghino Avatar answered Nov 02 '22 22:11

stefanobaghino


Against an object store, even the listing and calls to getFileStatus are pretty expensive, and as this is done during partitioning, can extend the job a lot.

Play with mapreduce.input.fileinputformat.list-status.num-threads to see if adding more threads speeds things up, say a value of 20-30

like image 23
stevel Avatar answered Nov 03 '22 00:11

stevel