Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to decide on the number of partitions required for input data size and cluster resources?

My use case as mentioned below.

  1. Read input data from local file system using sparkContext.textFile(input path).
  2. partition the input data(80 million records) into partitions using RDD.coalesce(numberOfPArtitions) before submitting it to mapper/reducer function. Without using coalesce() or repartition() on the input data spark executes really slow and fails with out of memory exception.

The issue i am facing here is in deciding the number of partitions to be applied on the input data. The input data size varies every time and hard coding a particular value is not an option. And spark performs really well only when certain optimum partition is applied on the input data for which i have to perform lots of iteration(trial and error). Which is not an option in a production environment.

My question: Is there a thumb rule to decide the number of partitions required depending on the input data size and cluster resources available(executors,cores, etc...)? If yes please point me in that direction. Any help is much appreciated.

I am using spark 1.0 on yarn.

Thanks, AG

like image 670
user3705662 Avatar asked Aug 16 '14 01:08

user3705662


2 Answers

Two notes from Tuning Spark in the Spark official documentation:

1- In general, we recommend 2-3 tasks per CPU core in your cluster.

2- Spark can efficiently support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has a low task launching cost, so you can safely increase the level of parallelism to more than the number of cores in your clusters.

These are two rule of tumb that help you to estimate the number and size of partitions. So, It's better to have small tasks (that could be completed in hundred ms).

like image 88
Farzad Nozarian Avatar answered Oct 04 '22 15:10

Farzad Nozarian


Determining the number of partitions is a bit tricky. Spark by default will try and infer a sensible number of partitions. Note: if you are using the textFile method with compressed text then Spark will disable splitting and then you will need to re-partition (it sounds like this might be whats happening?). With non-compressed data when you are loading with sc.textFile you can also specify a minium number of partitions (e.g. sc.textFile(path, minPartitions) ).

The coalesce function is only used to reduce the number of partitions, so you should consider using the repartition() function.

As far as choosing a "good" number you generally want at least as many as the number of executors for parallelism. There already exists some logic to try and determine a "good" amount of parallelism, and you can get this value by calling sc.defaultParallelism

like image 37
Holden Avatar answered Oct 04 '22 15:10

Holden