Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Spark decide how to partition an RDD?

Suppose I create such an RDD (I am using Pyspark):

list_rdd = sc.parallelize(xrange(0, 20, 2), 6)

then I print the partitioned elements with the glom() method and obtain

[[0], [2, 4], [6, 8], [10], [12, 14], [16, 18]]

How has Spark decided how to partition my list? Where does that specific choice of the elements come from? It could have coupled them differently, leaving some other elements than 0 and 10 alone, to create the 6 requested partitions. At a second run, the partitions are the same.

Using a larger range, with 29 elements, I get partitions in the pattern of 2 elements followed by three elements:

list_rdd = sc.parallelize(xrange(0, 30, 2), 6)
[[0, 2], [4, 6, 8], [10, 12], [14, 16, 18], [20, 22], [24, 26, 28]]

Using a smaller range of 9 elements I get

list_rdd = sc.parallelize(xrange(0, 10, 2), 6)
[[], [0], [2], [4], [6], [8]]

So what I infer is that Spark is generating the partitions by splitting the list into a configuration where smallest possible is followed by larger collections, and repeated.

The question is if there is a reason behind this choice, which is very elegant, but does it also provide performance advantages?

like image 467
mar tin Avatar asked Mar 04 '16 13:03

mar tin


People also ask

How is RDD partitioned?

Apache Spark's Resilient Distributed Datasets (RDD) are a collection of various data that are so big in size, that they cannot fit into a single node and should be partitioned across various nodes. Apache Spark automatically partitions RDDs and distributes the partitions across different nodes.

How many partitions should a Spark RDD have?

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster.

How do I choose a partition key for Spark?

You should partition by a field that you both need to filter by frequently and that has low cardinality, i.e: it will create a relatively small amount of directories with relatively big amount of data on each directory.

How does hash partitioning work in Spark?

Spark Default PartitionerThe Hash Partitioner works on the concept of using the hashcode() function. The concept of hashcode() is that equal objects have the same hashcode. On the basis of this concept, the Hash Partitioner will divide the keys that have the same hashcode and distribute them across the partitions.


1 Answers

Unless you specify a specific partitioner, then this is "random" in that it depends on the specific implementation of that RDD. In this case you can head to the ParallelCollectionsRDD to dig into it further.

getPartitions is defined as:

val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray

where slice is commented as (reformatted to fit better):

/**
* Slice a collection into numSlices sub-collections. 
* One extra thing we do here is to treat Range collections specially, 
* encoding the slices as other Ranges to minimize memory cost. 
* This makes it efficient to run Spark over RDDs representing large sets of numbers. 
* And if the collection is an inclusive Range, 
* we use inclusive range for the last slice.
*/

Note that there are some considerations with regards to memory. So, again, this is going to be specific to the implementation.

like image 148
Justin Pihony Avatar answered Sep 28 '22 02:09

Justin Pihony