I have this simple spark program. I am wondering why all data end up in one partition.
val l = List((30002,30000), (50006,50000), (80006,80000),
(4,0), (60012,60000), (70006,70000),
(40006,40000), (30012,30000), (30000,30000),
(60018,60000), (30020,30000), (20010,20000),
(20014,20000), (90008,90000), (14,0), (90012,90000),
(50010,50000), (100008,100000), (80012,80000),
(20000,20000), (30010,30000), (20012,20000),
(90016,90000), (18,0), (12,0), (70016,70000),
(20,0), (80020,80000), (100016,100000), (70014,70000),
(60002,60000), (40000,40000), (60006,60000),
(80000,80000), (50008,50000), (60008,60000),
(10002,10000), (30014,30000), (70002,70000),
(40010,40000), (100010,100000), (40002,40000),
(20004,20000),
(10018,10000), (50018,50000), (70004,70000),
(90004,90000), (100004,100000), (20016,20000))
val l_rdd = sc.parallelize(l, 2)
// print each item and index of the partition it belongs to
l_rdd.mapPartitionsWithIndex((index, iter) => {
iter.toList.map(x => (index, x)).iterator
}).collect.foreach(println)
// reduce on the second element of the list.
// alternatively you can use aggregateByKey
val l_reduced = l_rdd.map(x => {
(x._2, List(x._1))
}).reduceByKey((a, b) => {b ::: a})
// print the reduced results along with its partition index
l_reduced.mapPartitionsWithIndex((index, iter) => {
iter.toList.map(x => (index, x._1, x._2.size)).iterator
}).collect.foreach(println)
When you run this, you will see that data (l_rdd
) is distributed into two partitions. Once I reduced, the resultant RDD (l_reduced
) also has two partitions but all the data is in one partition (index 0) and the other one is empty. This happens even if the data is huge (a few GBs). Shouldn't the l_reduced
be also distributed into two partitions.
By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. Data of each partition resides in a single machine. Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions.
repartition() can be used for increasing or decreasing the number of partitions of a Spark DataFrame.
Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
'Coalesce' transformation allows user to repartition a dataset into only lower number of partitions as compared to the original number of partitions in a dataset. Here, there is no no need to specify any key.
val l_reduced = l_rdd.map(x => {
(x._2, List(x._1))
}).reduceByKey((a, b) => {b ::: a})
With reference to the above snippet, you are partitioning by the second field of the RDD. All the numbers in the second field end with 0.
When you call HashPartitioner, the partition number for a record is decided by the following function:
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
And the Utils.nonNegativeMod is defined as follows:
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
Let us see what happens when we apply the above two pieces of logic to your input:
scala> l.map(_._2.hashCode % 2) // numPartitions = 2
res10: List[Int] = List(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
Therefore, all of your records end up in partition 0.
You can solve this problem by a repartition:
val l_reduced = l_rdd.map(x => {
(x._2, List(x._1))
}).reduceByKey((a, b) => {b ::: a}).repartition(2)
which gives:
(0,100000,4)
(0,10000,2)
(0,0,5)
(0,20000,6)
(0,60000,5)
(0,80000,4)
(1,50000,4)
(1,30000,6)
(1,90000,4)
(1,70000,5)
(1,40000,4)
Alternatively, you can create a custom partitioner.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With