Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why sortBy() cannot sort the data evenly in Spark?

I'm writing a pyspark script to read in a big two-dimensional array, so I tried to firstly generate an index array and map with a read method to read in corresponding array. For example, if I have an array with 10 rows, then I want to these 10 rows are evenly partitioned as each partition has 2 rows. I tried this way with sortBy():

rdd = sc.range(0, 10, 1).sortBy(lambda x: x, numPartitions = 5)
rdd.glom().collect()

However, the result showed as:

[[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]]

Which indicates the sortBy() didn't work as I expected, so the first partition has 3 numbers while the last partition has only 1 number. When I map each partition with another read method, then the sizes of the partitions are different, and sometimes give rise to straggler.

And I tried another way of RDDs generation:

rdd = sc.parallelize(range(0, 10, 1), 5)
rdd.glom().collect()

And it returns the result I want.

[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

Can someone helps to explain why the first method using sortBy() cannot return the evenly sorted result?

like image 572
American curl Avatar asked Feb 05 '23 11:02

American curl


1 Answers

Because it is not designed to. In general case it is not possible to partition (including range partitioning) data to achieve equal size partitions. Remember that by the contract of partitioner all records for the specific value have to reside on a single partition. Even in cases where it is possible to achieve uniform distribution, determining exact partition boundaries would be prohibitively expensive.

Because of that Spark samples data with the goal is to obtain ranges of approximately uniform size and this behavior is good enough for the typical Spark applications.

SparkContext.parallelize doesn't use partitioner at all. Instead it computes splits based on the semantics of a specific input and therefore can create splits of equal size.

If you have prior knowledge about data distribution you can always design custom partitioning function which would result in desired output. For example:

import bisect
from functools import partial

partition_func = partial(bisect.bisect, [2, 4, 6, 8])

(sc.range(0, 10)
    .map(lambda x: (x, None))
    .repartitionAndSortWithinPartitions(5, partition_func)
    .keys())

For relatively short (up to 1<<60 or so) series of integers in CPython you could use hash partitioning:

(sc.range(0, 10, 1)
    .map(lambda x: (x, None))
    .partitionBy(10)
    .keys()
    .glom()
    .collect())
[[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]]

but is just an implementation detail (hash(x) where isinstance(x, int) equals x).

like image 184
zero323 Avatar answered Feb 07 '23 23:02

zero323