So, I understand that in general one should use coalesce()
when:
the number of partitions decreases due to a
filter
or some other operation that may result in reducing the original dataset (RDD, DF).coalesce()
is useful for running operations more efficiently after filtering down a large dataset.
I also understand that it is less expensive than repartition
as it reduces shuffling by moving data only if necessary. My problem is how to define the parameter that coalesce
takes (idealPartionionNo
). I am working on a project which was passed to me from another engineer and he was using the below calculation to compute the value of that parameter.
// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)
val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR
This is then used with a partitioner
object:
val partitioner = new HashPartitioner(idealPartionionNo)
but also used with:
RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)
Is this the right approach? What is the main idea behind the idealPartionionNo
value computation? What is the REPARTITION_FACTOR
? How do I generally work to define that?
Also, since YARN is responsible for identifying the available executors on the fly is there a way of getting that number (AVAILABLE_EXECUTOR_INSTANCES
) on the fly and use that for computing idealPartionionNo
(i.e. replace NO_OF_EXECUTOR_INSTANCES
with AVAILABLE_EXECUTOR_INSTANCES
)?
Ideally, some actual examples of the form:
n
executors with m
cores and a partition factor equal to k
then:
Also, if you can refer me to a nice blog that explains these I would really appreciate it.
The best way to decide on the number of partitions in an RDD is to make the number of partitions equal to the number of cores in the cluster so that all the partitions will process in parallel and the resources will be utilized in an optimal way.
The coalesce method reduces the number of partitions in a DataFrame. Coalesce avoids full shuffle, instead of creating new partitions, it shuffles the data using Hash Partitioner (Default), and adjusts into existing partitions, this means it can only decrease the number of partitions.
Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.
Coalesce will not move data in 2 executors and move the data from the remaining 3 executors to the 2 executors. Thereby avoiding a full shuffle. Because of the above reason the partition size vary by a high degree. Since full shuffle is avoided, coalesce is more performant than repartition.
In practice optimal number of partitions depends more on the data you have, transformations you use and overall configuration than the available resources.
reduce
in contrast to treeReduce
), a large number of partitions results in a higher load on the driver.You can find a number of rules which suggest oversubscribing partitions compared to the number of cores (factor 2 or 3 seems to be common) or keeping partitions at a certain size but this doesn't take into account your own code:
In my opinion:
Don't try to use fixed number of partitions based on the number of executors or cores. First understand your data and code, then adjust configuration to reflect your understanding.
Usually, it is relatively easy to determine the amount of raw data per partition for which your cluster exhibits stable behavior (in my experience it is somewhere in the range of few hundred megabytes, depending on the format, data structure you use to load data, and configuration). This is the "magic number" you're looking for.
Some things you have to remember in general:
*byKey
, join
, RDD.partitionBy
, Dataset.repartition
) can result in non-uniform data distribution. Always monitor your jobs for symptoms of a significant data skew.union
, coGroup
, join
) can affect the number of partitions. Your question is a valid one, but Spark partitioning optimization depends entirely on the computation you're running. You need to have a good reason to repartition/coalesce; if you're just counting an RDD (even if it has a huge number of sparsely populated partitions), then any repartition/coalesce step is just going to slow you down.
The difference between repartition(n)
(which is the same as coalesce(n, shuffle = true)
and coalesce(n, shuffle = false)
has to do with execution model. The shuffle model takes each partition in the original RDD, randomly sends its data around to all executors, and results in an RDD with the new (smaller or greater) number of partitions. The no-shuffle model creates a new RDD which loads multiple partitions as one task.
Let's consider this computation:
sc.textFile("massive_file.txt")
.filter(sparseFilterFunction) // leaves only 0.1% of the lines
.coalesce(numPartitions, shuffle = shuffle)
If shuffle
is true
, then the text file / filter computations happen in a number of tasks given by the defaults in textFile
, and the tiny filtered results are shuffled. If shuffle
is false
, then the number of total tasks is at most numPartitions
.
If numPartitions
is 1, then the difference is quite stark. The shuffle model will process and filter the data in parallel, then send the 0.1% of filtered results to one executor for downstream DAG operations. The no-shuffle model will process and filter the data all on one core from the beginning.
Consider your downstream operations. If you're just using this dataset once, then you probably don't need to repartition at all. If you are saving the filtered RDD for later use (to disk, for example), then consider the tradeoffs above. It takes experience to become familiar with these models and when one performs better, so try both out and see how they perform!
As others have answered, there is no formula which calculates what you ask for. That said, You can make an educated guess on the first part and then fine tune it over time.
The first step is to make sure you have enough partitions. If you have NO_OF_EXECUTOR_INSTANCES executors and NO_OF_EXECUTOR_CORES cores per executor then you can process NO_OF_EXECUTOR_INSTANCES*NO_OF_EXECUTOR_CORES partitions at the same time (each would go to a specific core of a specific instance). That said this assumes everything is divided equally between the cores and everything takes exactly the same time to process. This is rarely the case. There is a good chance that some of them would be finished before others either because of locallity (e.g. the data needs to come from a different node) or simply because they are not balanced (e.g. if you have data partitioned by root domain then partitions including google would probably be quite big). This is where the REPARTITION_FACTOR comes into play. The idea is that we "overbook" each core and therefore if one finishes very quickly and one finishes slowly we have the option of dividing the tasks between them. A factor of 2-3 is generally a good idea.
Now lets take a look at the size of a single partition. Lets say your entire data is X MB in size and you have N partitions. Each partition would be on average X/N MBs. If N is large relative to X then you might have very small average partition size (e.g. a few KB). In this case it is usually a good idea to lower N because the overhead of managing each partition becomes too high. On the other hand if the size is very large (e.g. a few GB) then you need to hold a lot of data at the same time which would cause issues such as garbage collection, high memory usage etc.
The optimal size is a good question but generally people seem to prefer partitions of 100-1000MB but in truth tens of MB probably would also be good.
Another thing you should note is when you do the calculation how your partitions change. For example, lets say you start with 1000 partitions of 100MB each but then filter the data so each partition becomes 1K then you should probably coalesce. Similar issues can happen when you do a groupby or join. In such cases both the size of the partition and the number of partitions change and might reach an undesirable size.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With