I am new to Spark. I have a large dataset of elements[RDD] and I want to divide it into two exactly equal sized partitions maintaining order of elements. I tried using RangePartitioner
like
var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
This doesn't give a satisfactory result because it divides roughly but not exactly equal sized maintaining order of elements. For example if there are 64 elements, we use Rangepartitioner
, then it divides into 31 elements and 33 elements.
I need a partitioner such that I get exactly first 32 elements in one half and other half contains second set of 32 elements. Could you please help me by suggesting how to use a customized partitioner such that I get equally sized two halves, maintaining the order of elements?
The loaded rdd is partitioned by default partitioner: hash code. To specify custom partitioner, use can check rdd. partitionBy(), provided with your own partitioner.
Custom partitioning lets you alter the size and number of partitions as per your application's needs. Here, you can define which key should enter which partition. One should provide an explicit partitioner by calling partitionBy method on a paired RDD.
As already mentioned above, one partition is created for each block of the file in HDFS which is of size 64MB. However, when creating a RDD a second argument can be passed that defines the number of partitions to be created for an RDD. The above line of code will create an RDD named textFile with 5 partitions.
There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
Partitioner
s work by assigning a key to a partition. You would need prior knowledge of the key distribution, or look at all keys, to make such a partitioner. This is why Spark does not provide you with one.
In general you do not need such a partitioner. In fact I cannot come up with a use case where I would need equal-size partitions. What if the number of elements is odd?
Anyway, let us say you have an RDD keyed by sequential Int
s, and you know how many in total. Then you could write a custom Partitioner
like this:
class ExactPartitioner[V]( partitions: Int, elements: Int) extends Partitioner { def getPartition(key: Any): Int = { val k = key.asInstanceOf[Int] // `k` is assumed to go continuously from 0 to elements-1. return k * partitions / elements } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With