Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Define Custom partitioner for Spark RDDs of equally sized partition where each partition has equal number of elements?

I am new to Spark. I have a large dataset of elements[RDD] and I want to divide it into two exactly equal sized partitions maintaining order of elements. I tried using RangePartitioner like

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile)) 

This doesn't give a satisfactory result because it divides roughly but not exactly equal sized maintaining order of elements. For example if there are 64 elements, we use Rangepartitioner, then it divides into 31 elements and 33 elements.

I need a partitioner such that I get exactly first 32 elements in one half and other half contains second set of 32 elements. Could you please help me by suggesting how to use a customized partitioner such that I get equally sized two halves, maintaining the order of elements?

like image 956
yh18190 Avatar asked Apr 17 '14 07:04

yh18190


People also ask

How can you create an RDD with specific partitioning?

The loaded rdd is partitioned by default partitioner: hash code. To specify custom partitioner, use can check rdd. partitionBy(), provided with your own partitioner.

What is custom partitioning in Spark?

Custom partitioning lets you alter the size and number of partitions as per your application's needs. Here, you can define which key should enter which partition. One should provide an explicit partitioner by calling partitionBy method on a paired RDD.

How many partitions does an RDD have?

As already mentioned above, one partition is created for each block of the file in HDFS which is of size 64MB. However, when creating a RDD a second argument can be passed that defines the number of partitions to be created for an RDD. The above line of code will create an RDD named textFile with 5 partitions.

How many ways can you create RDD in Spark?

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.


1 Answers

Partitioners work by assigning a key to a partition. You would need prior knowledge of the key distribution, or look at all keys, to make such a partitioner. This is why Spark does not provide you with one.

In general you do not need such a partitioner. In fact I cannot come up with a use case where I would need equal-size partitions. What if the number of elements is odd?

Anyway, let us say you have an RDD keyed by sequential Ints, and you know how many in total. Then you could write a custom Partitioner like this:

class ExactPartitioner[V](     partitions: Int,     elements: Int)   extends Partitioner {    def getPartition(key: Any): Int = {     val k = key.asInstanceOf[Int]     // `k` is assumed to go continuously from 0 to elements-1.     return k * partitions / elements   } } 
like image 199
Daniel Darabos Avatar answered Sep 20 '22 07:09

Daniel Darabos