Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why are all data end up in one partition after reduceByKey?

Tags:

I have this simple spark program. I am wondering why all data end up in one partition.

val l = List((30002,30000), (50006,50000), (80006,80000), 
             (4,0), (60012,60000), (70006,70000), 
             (40006,40000), (30012,30000), (30000,30000),
             (60018,60000), (30020,30000), (20010,20000), 
             (20014,20000), (90008,90000), (14,0), (90012,90000),
             (50010,50000), (100008,100000), (80012,80000),
             (20000,20000), (30010,30000), (20012,20000), 
             (90016,90000), (18,0), (12,0), (70016,70000), 
             (20,0), (80020,80000), (100016,100000), (70014,70000),
             (60002,60000), (40000,40000), (60006,60000), 
             (80000,80000), (50008,50000), (60008,60000), 
             (10002,10000), (30014,30000), (70002,70000),
             (40010,40000), (100010,100000), (40002,40000),
             (20004,20000), 
             (10018,10000), (50018,50000), (70004,70000),
             (90004,90000), (100004,100000), (20016,20000))

val l_rdd = sc.parallelize(l, 2)

// print each item and index of the partition it belongs to
l_rdd.mapPartitionsWithIndex((index, iter) => {
   iter.toList.map(x => (index, x)).iterator
}).collect.foreach(println)

// reduce on the second element of the list.
// alternatively you can use aggregateByKey  
val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a})

// print the reduced results along with its partition index
l_reduced.mapPartitionsWithIndex((index, iter) => {
      iter.toList.map(x => (index, x._1, x._2.size)).iterator
}).collect.foreach(println)

When you run this, you will see that data (l_rdd) is distributed into two partitions. Once I reduced, the resultant RDD (l_reduced) also has two partitions but all the data is in one partition (index 0) and the other one is empty. This happens even if the data is huge (a few GBs). Shouldn't the l_reduced be also distributed into two partitions.

like image 413
Shirish Kumar Avatar asked Feb 06 '17 21:02

Shirish Kumar


People also ask

How spark decides number of partitions?

By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. Data of each partition resides in a single machine. Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions.

How do I change the number of partitions in a spark data frame?

repartition() can be used for increasing or decreasing the number of partitions of a Spark DataFrame.

What does the reduceByKey function do?

Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

Can coalesce increase partitions?

'Coalesce' transformation allows user to repartition a dataset into only lower number of partitions as compared to the original number of partitions in a dataset. Here, there is no no need to specify any key.


1 Answers

val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a})

With reference to the above snippet, you are partitioning by the second field of the RDD. All the numbers in the second field end with 0.

When you call HashPartitioner, the partition number for a record is decided by the following function:

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

And the Utils.nonNegativeMod is defined as follows:

 def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

Let us see what happens when we apply the above two pieces of logic to your input:

scala> l.map(_._2.hashCode % 2) // numPartitions = 2
res10: List[Int] = List(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)

Therefore, all of your records end up in partition 0.

You can solve this problem by a repartition:

val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a}).repartition(2)

which gives:

(0,100000,4)
(0,10000,2)
(0,0,5)
(0,20000,6)
(0,60000,5)
(0,80000,4)
(1,50000,4)
(1,30000,6)
(1,90000,4)
(1,70000,5)
(1,40000,4)

Alternatively, you can create a custom partitioner.

like image 189
axiom Avatar answered Sep 24 '22 10:09

axiom