Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Round Robin partitioning in Spark work?

I've trouble to understand Round Robin Partitioning in Spark. Consider the following exampl. I split a Seq of size 3 into 3 partitions:

val df = Seq(0,1,2).toDF().repartition(3)

df.explain

== Physical Plan ==
Exchange RoundRobinPartitioning(3)
+- LocalTableScan [value#42]

Now if I inspect the partitions, I get:

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_index","number_of_records")
  .show

+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
|              0|                0|
|              1|                2|
|              2|                1|
+---------------+-----------------+

If I do the same with Seq of size 8 and split it into 8 partitions, I get even worse skew:

(0 to 7).toDF().repartition(8)
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_index","number_of_records")
  .show

+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
|              0|                0|
|              1|                0|
|              2|                0|
|              3|                0|
|              4|                0|
|              5|                0|
|              6|                4|
|              7|                4|
+---------------+-----------------+

Can somebody explain this behavior. As far as I understand round robin partitioning, all partitions show be ~same size.

like image 832
Raphael Roth Avatar asked Jan 10 '19 07:01

Raphael Roth


People also ask

How does round robin partition work?

Round-robin partitioning is used to achieve an equal distribution of rows to partitions. However, unlike hash partitioning, you do not have to specify partitioning columns. With round-robin partitioning, new rows are assigned to partitions on a rotation basis. The table must not have primary keys.

How does partition happen in Spark?

Spark Partitioning in a nutshell In order to achieve high parallelism, Spark will split the data into smaller chunks called partitions which are distributed across different nodes in the Spark Cluster. Every node, can have more than one executor each of which can execute a task.

How does hash partitioning work in Spark?

Spark Default PartitionerThe Hash Partitioner works on the concept of using the hashcode() function. The concept of hashcode() is that equal objects have the same hashcode. On the basis of this concept, the Hash Partitioner will divide the keys that have the same hashcode and distribute them across the partitions.

What are the types of partitioning in Spark?

Apache Spark supports two types of partitioning “hash partitioning” and “range partitioning”.


1 Answers

(Checked for Spark version 2.1-2.4)

As far as I can see from ShuffleExchangeExec code, Spark tries to partition the rows directly from original partitions (via mapPartitions) without bringing anything to the driver.

The logic is to start with a randomly picked target partition and then assign partitions to the rows in a round-robin method. Note that "start" partition is picked for each source partition and there could be collisions.

The final distribution depends on many factors: a number of source/target partitions and the number of rows in your dataframe.

like image 109
Sergey Khudyakov Avatar answered Nov 15 '22 07:11

Sergey Khudyakov