Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

When should I repartition an RDD?

I know that I can repartition an RDD to increase its partitions and use coalesce to decrease its partitions. I have two questions regarding this that I cannot completely understand after reading different resources.

Spark will use a sensible default (1 partition per block which is 64MB in first versions and now 128MB) when generating an RDD. But I also read that it is recommended to use 2 or 3 times the number of cores running the jobs. So here comes the question:

  1. How many partitions should I use for a given file? For example, suppose I have a 10GB .parquet file, 3 executors with 2 cores and 3gb memory each. Should I repartition? How many partitions should I use? What is the better way to make that choice?

  2. Are all data types (ie .txt, .parquet, etc..) repartitioned by default if no partitioning is provided?

like image 547
Marcos Avatar asked Aug 18 '17 03:08

Marcos


People also ask

When should I use repartition?

The Repartition can be used to either increase or decrease the number of partitions in a DataFrame. Repartition is an entire Shuffle operation; whole data is taken out from existing partitions and equally distributed into newly formed partitions.

Why should you repartition in Spark?

The repartition() method is used to increase or decrease the number of partitions of an RDD or dataframe in spark. This method performs a full shuffle of data across all the nodes. It creates partitions of more or less equal in size. This is a costly operation given that it involves data movement all over the network.

Why do we need to repartition?

So what is repartition → It is a transformation in spark that will change the number of partitions and balances the data. It can be used to increase or decrease the number of partitions and always shuffles all the data over the network. So it will be termed as a fairly expensive operation.

Should I use coalesce or repartition?

Spark repartition() vs coalesce() – repartition() is used to increase or decrease the RDD, DataFrame, Dataset partitions whereas the coalesce() is used to only decrease the number of partitions in an efficient way.


1 Answers

Spark can run a single concurrent task for every partition of an RDD, up to the total number of cores in the cluster.

For example :

val rdd= sc.textFile ("file.txt", 5)

The above line of code will create an RDD named textFile with 5 partitions.

Suppose that you have a cluster with 4 cores and assume that each partition needs to process for 5 minutes. In case of the above RDD with 5 partitions, 4 partition processes will run in parallel as there are 4 cores and the 5th partition process will process after 5 minutes when one of the 4 cores, is free.

The entire processing will be completed in 10 minutes and during the 5th partition process, the resources (remaining 3 cores) will remain idle.

The best way to decide on the number of partitions in a RDD is to make the number of partitions equal to the number of cores in the cluster so that all the partitions will process in parallel and the resources will be utilized in an optimal way.


Question : Are all data types (ie .txt, .parquet, etc..) repartitioned by default if no partitioning is provided?

There will be default no of partitions for every rdd. to check you can use rdd.partitions.length right after rdd created.

to use existing cluster resources in optimal way and to speed up, we have to consider re-partitioning to ensure that all cores are utilized and all partitions have enough number of records which are uniformly distributed.

For better understanding, also have a look at https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html

Note : There is no fixed formula for this. general convention most of them follow is

(numOf executors * no of cores) * replicationfactor (which may be 2 or 3 times more )

like image 81
Ram Ghadiyaram Avatar answered Sep 17 '22 17:09

Ram Ghadiyaram