Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Spark's repartitionAndSortWithinPartitions?

I am trying to build a minimal working example of repartitionAndSortWithinPartitions in order to understand the function. I have got so far (not working, the distinct throws the values around so that they get out of order)

def partval(partID:Int, iter: Iterator[Int]): Iterator[Tuple2[Int, Int]] = {
  iter.map( x => (partID, x)).toList.iterator
}

val part20to3_chaos = sc.parallelize(1 to 20, 3).distinct
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)
part20to2_sorted.mapPartitionsWithIndex(partval).collect

but get the error

Name: Compile Error
Message: <console>:22: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Int]
             val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)

I tried using the scaladoc, but wasn't able to find which class provides repartitionAndSortWithinPartitions. (Btw: This scaladoc is not impressive: Why is MapPartitionsRDD missing? How can I search for a method?)

Realising I need a partitioner object, next I tried to

val rangePartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(rangePartitioner)
part20to2_sorted.mapPartitionsWithIndex(partval).collect

but got

Name: Compile Error
Message: <console>:22: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[Int]
 required: org.apache.spark.rdd.RDD[_ <: Product2[?,?]]
Error occurred in an application involving default arguments.
         val rPartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)

How do I get this to compile? Could I get a working example, please?

like image 512
Make42 Avatar asked May 14 '16 13:05

Make42


2 Answers

Your problem is that part20to3_chaos is an RDD[Int], while OrderedRDDFunctions.repartitionAndSortWithinPartitions is a method which operates on an RDD[(K, V)], where K is the key and V is the value.

repartitionAndSortWithinPartitions will first repartition the data based on the provided partitioner, and then sort by the key:

/**
 * Repartition the RDD according to the given partitioner and, 
 * within each resulting partition, sort records by their keys.
 *
 * This is more efficient than calling `repartition` and then sorting within each partition
 * because it can push the sorting down into the shuffle machinery.
 */
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = 
  self.withScope {
    new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}

So it looks like it's not exactly what you're looking for.

If you want a plain old sort, you can use sortBy, as it doesn't require a key:

scala> val toTwenty = sc.parallelize(1 to 20, 3).distinct
toTwenty: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at distinct at <console>:33

scala> val sorted = toTwenty.sortBy(identity, true, 3).collect
sorted: Array[Int] = 
    Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

Where you pass sortBy the order (ascending or descending), and the number of partitions you want to create.

like image 159
Yuval Itzchakov Avatar answered Sep 17 '22 21:09

Yuval Itzchakov


Let me try to explain repartitionAndSortWithinPartitions thru pyspark.

Suppose you have a dataset in pair form

pairs  = sc.parallelize([["a",1], ["b",2], ["c",3], ["d",3]])

pairs.collect() 
# Output [['a', 1], ['b', 2], ['c', 3], ['d', 3]]
pairs.repartitionAndSortWithinPartitions(2).glom().collect() 
# Output [[('a', 1), ('c', 3)], [('b', 2), ('d', 3)]]

Thru repartitionAndSortWithinPartitions() we asked the data to be reshuffled in 2 partitions and that's exactly what we get. 'a' and 'c' as one 'b' and 'd' as another one. The keys are sorted.

We can also repartition-n-sort based on certain condition, as

pairs.repartitionAndSortWithinPartitions(2, 
                                         partitionFunc=lambda x: x == 'a').glom().collect()
# Output [[('b', 2), ('c', 3), ('d', 3)], [('a', 1)]]

As expected we have two partitions one with 3 key-pairs sorted and one with ('a',1). To know more about glom refer to this link

like image 38
Somum Avatar answered Sep 19 '22 21:09

Somum