I've trouble to understand Round Robin Partitioning in Spark. Consider the following exampl. I split a Seq of size 3 into 3 partitions:
val df = Seq(0,1,2).toDF().repartition(3)
df.explain
== Physical Plan ==
Exchange RoundRobinPartitioning(3)
+- LocalTableScan [value#42]
Now if I inspect the partitions, I get:
df
.rdd
.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
.toDF("partition_index","number_of_records")
.show
+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
| 0| 0|
| 1| 2|
| 2| 1|
+---------------+-----------------+
If I do the same with Seq of size 8 and split it into 8 partitions, I get even worse skew:
(0 to 7).toDF().repartition(8)
.rdd
.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
.toDF("partition_index","number_of_records")
.show
+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
| 0| 0|
| 1| 0|
| 2| 0|
| 3| 0|
| 4| 0|
| 5| 0|
| 6| 4|
| 7| 4|
+---------------+-----------------+
Can somebody explain this behavior. As far as I understand round robin partitioning, all partitions show be ~same size.
Round-robin partitioning is used to achieve an equal distribution of rows to partitions. However, unlike hash partitioning, you do not have to specify partitioning columns. With round-robin partitioning, new rows are assigned to partitions on a rotation basis. The table must not have primary keys.
Spark Partitioning in a nutshell In order to achieve high parallelism, Spark will split the data into smaller chunks called partitions which are distributed across different nodes in the Spark Cluster. Every node, can have more than one executor each of which can execute a task.
Spark Default PartitionerThe Hash Partitioner works on the concept of using the hashcode() function. The concept of hashcode() is that equal objects have the same hashcode. On the basis of this concept, the Hash Partitioner will divide the keys that have the same hashcode and distribute them across the partitions.
Apache Spark supports two types of partitioning “hash partitioning” and “range partitioning”.
(Checked for Spark version 2.1-2.4)
As far as I can see from ShuffleExchangeExec
code, Spark tries to partition the rows directly from original partitions (via mapPartitions
) without bringing anything to the driver.
The logic is to start with a randomly picked target partition and then assign partitions to the rows in a round-robin method. Note that "start" partition is picked for each source partition and there could be collisions.
The final distribution depends on many factors: a number of source/target partitions and the number of rows in your dataframe.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With