I have gone through various articles about hash partitioning. But I still don't get it in what scenarios it is more advantageous than range partitioning. Using sortByKey followed by range partitioning allows data to be distributed evenly across cluster. But that may not be the case in hash partitioning. Consider the following example:
Consider a pair RDD with keys [8, 96, 240, 400, 401, 800] and the desired number of partition is 4.
In this case, hash partitioning distributes the keys as follows among the partitions:
partition 0: [8, 96, 240, 400, 800]
partition 1: [ 401 ]
partition 2: []
partition 3: []
(To compute partition : p = key.hashCode() % numPartitions )
The above partition leads to bad performance as the keys are not evenly distributed across all nodes. Since range partition can equally distribute the keys across the cluster, then in what scenarios hash partition proves to be a best fit over range partition?
While weakness of the hashCode
is of some concern, especially when working with small integers, it usually can be addressed by adjusting number of partitions based on domain specific knowledge. It is also possible to replace default HashPartitioner
with custom Partitioner
using more appropriate hashing function. As long as there is no data skew, hash partitioning behaves well enough at scale on average.
Data skews are completely different problem. If key distribution is significantly skewed, then distribution of the partitioned data, is likely to be skewed, no matter what Partitioner
is used. Consider for example following RDD:
sc.range(0, 1000).map(i => if(i < 9000) 1 else i).map((_, None))
which simply cannot be uniformly partitioned.
Why not use RangePartitioner
by default?
HashPartioner
. While HashPartitioner
requires only a proper implementation of ##
and ==
for K
, RangePartitioner
requires an Ordering[K]
.HashPartitioner
, it has to approximate data distribution, therefore it requires additional data scan.Because splits are computed based on a particular distribution, it might be unstable when reused across datasets. Consider following example:
val rdd1 = sc.range(0, 1000).map((_, None))
val rdd2 = sc.range(1000, 2000).map((_, None))
val rangePartitioner = new RangePartitioner(11, rdd1)
rdd1.partitionBy(rangePartitioner).glom.map(_.length).collect
Array[Int] = Array(88, 91, 99, 91, 87, 92, 83, 93, 91, 86, 99)
rdd2.partitionBy(rangePartitioner).glom.map(_.length).collect
Array[Int] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1000)
As you can imagine this has serious implications for operations like joins
. At the same time
val hashPartitioner = new HashPartitioner(11)
rdd1.partitionBy(hashPartitioner).glom.map(_.length).collect
Array[Int] = Array(91, 91, 91, 91, 91, 91, 91, 91, 91, 91, 90)
rdd2.partitionBy(hashPartitioner).glom.map(_.length).collect
Array[Int] = Array(91, 91, 91, 91, 91, 91, 91, 91, 91, 90, 91)
This brings us back to your questions:
in what scenarios it is more advantageous than range partitioning.
Hash partitioning is a default approach in many systems because it is relatively agnostic, usually behaves reasonably well, and doesn't require additional information about data distribution. These properties make it preferable, in lack of any a priori knowledge about the data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With