Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does HashPartitioner work?

I read up on the documentation of HashPartitioner. Unfortunately nothing much was explained except for the API calls. I am under the assumption that HashPartitioner partitions the distributed set based on the hash of the keys. For example if my data is like

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3) 

So partitioner would put this into different partitions with same keys falling in the same partition. However I do not understand the significance of the constructor argument

new HashPartitoner(numPartitions) //What does numPartitions do? 

For the above dataset how would the results differ if I did

new HashPartitoner(1) new HashPartitoner(2) new HashPartitoner(10) 

So how does HashPartitioner work actually?

like image 560
Sohaib Avatar asked Jul 15 '15 07:07

Sohaib


People also ask

How does hash partitioning work in spark?

Spark Default PartitionerThe Hash Partitioner works on the concept of using the hashcode() function. The concept of hashcode() is that equal objects have the same hashcode. On the basis of this concept, the Hash Partitioner will divide the keys that have the same hashcode and distribute them across the partitions.

How does spark repartition work?

Repartition is a method in spark which is used to perform a full shuffle on the data present and creates partitions based on the user's input. The resulting data is hash partitioned and the data is equally distributed among the partitions.

How does spark determine number of partitions?

The number of partitions in spark should be decided thoughtfully based on the cluster configuration and requirements of the application. Increasing the number of partitions will make each partition have less data or no data at all.

What is Exchange HashPartitioning?

HashPartitioning is an Expression that cannot be evaluated (and produce a value given an internal row). HashPartitioning uses the MurMur3 Hash to compute the partitionId for data distribution (consistent for shuffling and bucketing that is crucial for joins of bucketed and regular tables).


2 Answers

Well, lets make your dataset marginally more interesting:

val rdd = sc.parallelize(for {     x <- 1 to 3     y <- 1 to 2 } yield (x, None), 8) 

We have six elements:

rdd.count 
Long = 6 

no partitioner:

rdd.partitioner 
Option[org.apache.spark.Partitioner] = None 

and eight partitions:

rdd.partitions.length 
Int = 8 

Now lets define small helper to count number of elements per partition:

import org.apache.spark.rdd.RDD  def countByPartition(rdd: RDD[(Int, None.type)]) = {     rdd.mapPartitions(iter => Iterator(iter.length)) } 

Since we don't have partitioner our dataset is distributed uniformly between partitions (Default Partitioning Scheme in Spark):

countByPartition(rdd).collect() 
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1) 

inital-distribution

Now lets repartition our dataset:

import org.apache.spark.HashPartitioner val rddOneP = rdd.partitionBy(new HashPartitioner(1)) 

Since parameter passed to HashPartitioner defines number of partitions we have expect one partition:

rddOneP.partitions.length 
Int = 1 

Since we have only one partition it contains all elements:

countByPartition(rddOneP).collect 
Array[Int] = Array(6) 

hash-partitioner-1

Note that the order of values after the shuffle is non-deterministic.

Same way if we use HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2)) 

we'll get 2 partitions:

rddTwoP.partitions.length 
Int = 2 

Since rdd is partitioned by key data won't be distributed uniformly anymore:

countByPartition(rddTwoP).collect() 
Array[Int] = Array(2, 4) 

Because with have three keys and only two different values of hashCode mod numPartitions there is nothing unexpected here:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2)) 
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1)) 

Just to confirm the above:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect() 
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3)) 

hash-partitioner-2

Finally with HashPartitioner(7) we get seven partitions, three non-empty with 2 elements each:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7)) rddSevenP.partitions.length 
Int = 7 
countByPartition(rddTenP).collect() 
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0) 

hash-partitioner-7

Summary and Notes

  • HashPartitioner takes a single argument which defines number of partitions
  • values are assigned to partitions using hash of keys. hash function may differ depending on the language (Scala RDD may use hashCode, DataSets use MurmurHash 3, PySpark, portable_hash).

    In simple case like this, where key is a small integer, you can assume that hash is an identity (i = hash(i)).

    Scala API uses nonNegativeMod to determine partition based on computed hash,

  • if distribution of keys is not uniform you can end up in situations when part of your cluster is idle

  • keys have to be hashable. You can check my answer for A list as a key for PySpark's reduceByKey to read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:

    Java arrays have hashCodes that are based on the arrays' identities rather than their contents, so attempting to partition an RDD[Array[]] or RDD[(Array[], _)] using a HashPartitioner will produce an unexpected or incorrect result.

  • In Python 3 you have to make sure that hashing is consistent. See What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?

  • Hash partitioner is neither injective nor surjective. Multiple keys can be assigned to a single partition and some partitions can remain empty.

  • Please note that currently hash based methods don't work in Scala when combined with REPL defined case classes (Case class equality in Apache Spark).

  • HashPartitioner (or any other Partitioner) shuffles the data. Unless partitioning is reused between multiple operations it doesn't reduce amount of data to be shuffled.

like image 57
zero323 Avatar answered Oct 13 '22 09:10

zero323


RDD is distributed this means it is split on some number of parts. Each of this partitions is potentially on different machine. Hash partitioner with argument numPartitions chooses on what partition to place pair (key, value) in following way:

  1. Creates exactly numPartitions partitions.
  2. Places (key, value) in partition with number Hash(key) % numPartitions
like image 41
abalcerek Avatar answered Oct 13 '22 08:10

abalcerek