Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In pyspark, why does `limit` followed by `repartition` create exactly equal partition sizes?

According to the pyspark documentation, repartition is supposed to use hash partitioning, which would give slightly unequal partition sizes. However, I have found that by preceding it with limit, it will produce exactly equal partition sizes. This can be shown by running the following in a pyspark shell:

df = spark.createDataFrame([range(5)] * 100)

def count_part_size(part_iter):
    yield len(list(part_iter))

print(df.repartition(20).rdd.mapPartitions(count_part_size).collect())
# [4, 4, 4, 5, 4, 4, 5, 4, 5, 6, 6, 6, 7, 5, 5, 5, 5, 6, 5, 5]

print(df.limit(100).repartition(20).rdd.mapPartitions(count_part_size).collect())
# [5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]

If repartition is using a hash partitioner, why would it produce exactly equal partition sizes in this case? And if it is not using a hash partitioner, what kind of partitioner is it using?

By the way, I am using python version 2.7.15 and spark version 2.0.2

like image 689
Isaac Avatar asked Feb 22 '19 21:02

Isaac


1 Answers

There are four factors here:

  • If no partitioning expression is provided, repartition doesn't use HashPartitioning, or to be specific, it doesn't use it directly. Instead it uses RoundRobinPartitioning, which (as you can probably guess)

    Distributes elements evenly across output partitions, starting from a random partition.

    Internally, it generates a sequence of scala.Int on each partition, starting from a random point. Only these values are passed through HashPartitioner.

  • It works this way because Int hashCode is simply identity - in other words

    ∀x∈Int x = hashCode(x)

    (that's BTW the same behavior as of CPython hash in the Scala Int range - -2147483648 to 2147483647. These hashes are simply not designed to be cryptographically secure) As a result applying HashPartitioner to a series of Int values results in actual Round Robin assignment.

    So in such case HashPartitioner works simply as a modulo operator.

  • You apply LIMIT before repartition so all values are shuffled to a single node first. Therefore there is only one sequence of Int values used.

  • Number of partitions is a divisor of the size of the dataset. Due to that data can be uniformly distributed among partitions.

Overall it is a combination of intended behavior (each partition should be uniformly distributed among output partitions), properties of pipeline (there is only one input partition) and the data (dataset can be uniformly distributed).

like image 191
10465355 Avatar answered Oct 23 '22 10:10

10465355