I am trying to build a minimal working example of repartitionAndSortWithinPartitions
in order to understand the function. I have got so far (not working, the distinct throws the values around so that they get out of order)
def partval(partID:Int, iter: Iterator[Int]): Iterator[Tuple2[Int, Int]] = {
iter.map( x => (partID, x)).toList.iterator
}
val part20to3_chaos = sc.parallelize(1 to 20, 3).distinct
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)
part20to2_sorted.mapPartitionsWithIndex(partval).collect
but get the error
Name: Compile Error
Message: <console>:22: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Int]
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)
I tried using the scaladoc, but wasn't able to find which class provides repartitionAndSortWithinPartitions
. (Btw: This scaladoc is not impressive: Why is MapPartitionsRDD
missing? How can I search for a method?)
Realising I need a partitioner object, next I tried to
val rangePartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(rangePartitioner)
part20to2_sorted.mapPartitionsWithIndex(partval).collect
but got
Name: Compile Error
Message: <console>:22: error: type mismatch;
found : org.apache.spark.rdd.RDD[Int]
required: org.apache.spark.rdd.RDD[_ <: Product2[?,?]]
Error occurred in an application involving default arguments.
val rPartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)
How do I get this to compile? Could I get a working example, please?
Your problem is that part20to3_chaos
is an RDD[Int]
, while OrderedRDDFunctions.repartitionAndSortWithinPartitions
is a method which operates on an RDD[(K, V)]
, where K
is the key and V
is the value.
repartitionAndSortWithinPartitions
will first repartition the data based on the provided partitioner, and then sort by the key:
/**
* Repartition the RDD according to the given partitioner and,
* within each resulting partition, sort records by their keys.
*
* This is more efficient than calling `repartition` and then sorting within each partition
* because it can push the sorting down into the shuffle machinery.
*/
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] =
self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
So it looks like it's not exactly what you're looking for.
If you want a plain old sort, you can use sortBy
, as it doesn't require a key:
scala> val toTwenty = sc.parallelize(1 to 20, 3).distinct
toTwenty: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at distinct at <console>:33
scala> val sorted = toTwenty.sortBy(identity, true, 3).collect
sorted: Array[Int] =
Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
Where you pass sortBy
the order (ascending or descending), and the number of partitions you want to create.
Let me try to explain repartitionAndSortWithinPartitions thru pyspark.
Suppose you have a dataset in pair form
pairs = sc.parallelize([["a",1], ["b",2], ["c",3], ["d",3]])
pairs.collect()
# Output [['a', 1], ['b', 2], ['c', 3], ['d', 3]]
pairs.repartitionAndSortWithinPartitions(2).glom().collect()
# Output [[('a', 1), ('c', 3)], [('b', 2), ('d', 3)]]
Thru repartitionAndSortWithinPartitions() we asked the data to be reshuffled in 2 partitions and that's exactly what we get. 'a' and 'c' as one 'b' and 'd' as another one. The keys are sorted.
We can also repartition-n-sort based on certain condition, as
pairs.repartitionAndSortWithinPartitions(2,
partitionFunc=lambda x: x == 'a').glom().collect()
# Output [[('b', 2), ('c', 3), ('d', 3)], [('a', 1)]]
As expected we have two partitions one with 3 key-pairs sorted and one with ('a',1). To know more about glom refer to this link
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With