Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: What is the difference between repartition and repartitionByRange?

I went through the documentation here: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

It says:

  • for repartition: resulting DataFrame is hash partitioned.
  • for repartitionByRange: resulting DataFrame is range partitioned.

And a previous question also mentions it. However, I still don't understand how exactly they differ and what the impact will be when choosing one over the other?

More importantly, if repartition does hash partitioning, what impact does providing columns as its argument have?

like image 824
pallupz Avatar asked Jan 20 '21 12:01

pallupz


People also ask

What is difference between repartition and coalesce in Spark?

Differences between coalesce and repartition The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.

What is the difference between partition by and repartition?

They're often used in conjunction. Both repartition() and partitionBy can be used to "partition data based on dataframe column", but repartition() partitions the data in memory and partitionBy partitions the data on disk.

What does repartition in Spark do?

The repartition function allows us to change the distribution of the data on the Spark cluster. This distribution change will induce shuffle (physical data movement) under the hood, which is quite an expensive operation.

Is repartition faster than coalesce?

Repartition always involves a shuffle. Repartition works by creating new partitions and doing a full shuffle to move data around. Results in more or less equal sized partitions. Since a full shuffle takes place, repartition is less performant than coalesce.


2 Answers

I think it is best to look into the difference with some experiments.

Test Dataframes

For this experiment, I am using the following two Dataframes (I am showing the code in Scala but the concept is identical to Python APIs):

// Dataframe with one column "value" containing the values ranging from 0 to 1000000
val df = Seq(0 to 1000000: _*).toDF("value")

// Dataframe with one column "value" containing 1000000 the number 0 in addition to the numbers 5000, 10000 and 100000
val df2 = Seq((0 to 1000000).map(_ => 0) :+ 5000 :+ 10000 :+ 100000: _*).toDF("value")

Theory

  • repartition applies the HashPartitioner when one or more columns are provided and the RoundRobinPartitioner when no column is provided. If one or more columns are provided (HashPartitioner), those values will be hashed and used to determine the partition number by calculating something like partition = hash(columns) % numberOfPartitions. If no column is provided (RoundRobinPartitioner) the data gets evenly distributed across the specified number of partitions.

  • repartitionByRange will partition the data based on a range of the column values. This is usually used for continuous (not discrete) values such as any kind of numbers. Note that due to performance reasons this method uses sampling to estimate the ranges. Hence, the output may not be consistent, since sampling can return different values. The sample size can be controlled by the config spark.sql.execution.rangeExchange.sampleSizePerPartition.

It is also worth mentioning that for both methods if numPartitions is not given, by default it partitions the Dataframe data into spark.sql.shuffle.partitions configured in your Spark session, and could be coalesced by Adaptive Query Execution (available since Spark 3.x).

Test Setup

Based on the given Testdata I am always applying the same code:

val testDf = df
// here I will insert the partition logic
    .withColumn("partition", spark_partition_id()) // applying SQL built-in function to determine actual partition
    .groupBy(col("partition"))
    .agg(
      count(col("value")).as("count"),
      min(col("value")).as("min_value"),
      max(col("value")).as("max_value"))
    .orderBy(col("partition"))

testDf.show(false)

Test Results

df.repartition(4, col("value"))

As expected, we get 4 partitions and because the values of df are ranging from 0 to 1000000 we see that their hashed values will result in a well distributed Dataframe.

+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0        |249911|12       |1000000  |
|1        |250076|6        |999994   |
|2        |250334|2        |999999   |
|3        |249680|0        |999998   |
+---------+------+---------+---------+

df.repartitionByRange(4, col("value"))

Also in this case, we get 4 partitions but this time the min and max values clearly shows the ranges of values within a partition. It is almost equally distributed with 250000 values per partition.

+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0        |244803|0        |244802   |
|1        |255376|244803   |500178   |
|2        |249777|500179   |749955   |
|3        |250045|749956   |1000000  |
+---------+------+---------+---------+

df2.repartition(4, col("value"))

Now, we are using the other Dataframe df2. Here, the hashing algorithm is hashing the values which are only 0, 5000, 10000 or 100000. Of course, the hash of the value 0 will always be the same, so all Zeros end up in the same partition (in this case partition 3). The other two partitions only contain one value.

+---------+-------+---------+---------+
|partition|count  |min_value|max_value|
+---------+-------+---------+---------+
|0        |1      |100000   |100000   |
|1        |1      |10000    |10000    |
|2        |1      |5000     |5000     |
|3        |1000001|0        |0        |
+---------+-------+---------+---------+

df2.repartition(4)

Without using the content of the column "value" the repartition method will distribute the messages on a RoundRobin basis. All partitions have almost the same amount of data.

+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0        |250002|0        |5000     |
|1        |250002|0        |10000    |
|2        |249998|0        |100000   |
|3        |250002|0        |0        |
+---------+------+---------+---------+

df2.repartitionByRange(4, col("value"))

This case shows that the Dataframe df2 is not well defined for a repartitioning by range as almost all values are 0. Therefore, we end up having only two partitions whereas the partition 0 contains all Zeros.

+---------+-------+---------+---------+
|partition|count  |min_value|max_value|
+---------+-------+---------+---------+
|0        |1000001|0        |0        |
|1        |3      |5000     |100000   |
+---------+-------+---------+---------+
like image 133
Michael Heil Avatar answered Nov 13 '22 23:11

Michael Heil


By using df.explain you can get much information about these operations.

I'm using this DataFrame for the example :

df = spark.createDataFrame([(i, f"value {i}") for i in range(1, 22, 1)], ["id", "value"])

Repartition

Depending on whether a key expression (column) is specified or not, the partitioning method will be different. It is not always hash partitioning as you said.

df.repartition(3).explain(True)

== Parsed Logical Plan ==
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false

== Analyzed Logical Plan ==
id: bigint, value: string
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false

== Optimized Logical Plan ==
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false

== Physical Plan ==
Exchange RoundRobinPartitioning(3)
+- Scan ExistingRDD[id#0L,value#1]

We can see in the generated physical plan that RoundRobinPartitioning is used:

Represents a partitioning where rows are distributed evenly across output partitions by starting from a random target partition number and distributing rows in a round-robin fashion. This partitioning is used when implementing the DataFrame.repartition() operator.

When using repartition by column expression:

df.repartition(3, "id").explain(True)

== Parsed Logical Plan ==
'RepartitionByExpression ['id], 3
+- LogicalRDD [id#0L, value#1], false

== Analyzed Logical Plan ==
id: bigint, value: string
RepartitionByExpression [id#0L], 3
+- LogicalRDD [id#0L, value#1], false

== Optimized Logical Plan ==
RepartitionByExpression [id#0L], 3
+- LogicalRDD [id#0L, value#1], false

== Physical Plan ==
Exchange hashpartitioning(id#0L, 3)
+- Scan ExistingRDD[id#0L,value#1]

Now the picked partitioning method is hashpartitioning. In hash partitioning method, a Java Object.hashCode is being calculated for every key expression to determine the destination partition_id by calculating a modulo: key.hashCode % numPartitions.

RepartitionByRange

This partitioning method creates numPartitions consecutive and not overlapping ranges of values based on the partitioning key. Thus, at least one key expression is required and needs to be orderable.

df.repartitionByRange(3, "id").explain(True)

== Parsed Logical Plan ==
'RepartitionByExpression ['id ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false

== Analyzed Logical Plan ==
id: bigint, value: string
RepartitionByExpression [id#0L ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false

== Optimized Logical Plan ==
RepartitionByExpression [id#0L ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false

== Physical Plan ==
Exchange rangepartitioning(id#0L ASC NULLS FIRST, 3)
+- Scan ExistingRDD[id#0L,value#1]

Looking at the generated physical plan, we can see that rangepartitioning differs from the two others described above by the presence of the ordering clause in the partitioning expression. When no explicit sort order is specified in the expression, it uses ascending order by default.

Some interesting links:

  • Repartition Logical Operators — Repartition and RepartitionByExpression
  • Range partitioning in Apache SparkSQL
  • hash vs range partitioning
like image 42
blackbishop Avatar answered Nov 13 '22 23:11

blackbishop