Method 1: Using sortBy() sortBy() is used to sort the data by value efficiently in pyspark. It is a method available in rdd. It uses a lambda expression to sort the data based on columns.
In Spark, you can use either sort() or orderBy() function of DataFrame/Dataset to sort by ascending or descending order based on single or multiple columns, you can also do sorting using Spark SQL sorting functions, In this article, I will explain all these different ways using Scala examples.
We can use either orderBy() or sort() method to sort the data in the dataframe. Pass asc() to sort the data in ascending order; otherwise, desc(). We can do this based on a single column or multiple columns.
The sorting usually should be done before collect() is called since that returns the dataset to the driver program and also that is the way an hadoop map-reduce job would be programmed in java so that the final output you want is written (typically) to HDFS. With the spark API this approach provides the flexibility of writing the output in "raw" form where you want, such as to a file where it could be used as input for further processing.
Using spark's scala API sorting before collect() can be done following eliasah's suggestion and using Tuple2.swap() twice, once before sorting and once after in order to produce a list of tuples sorted in increasing or decreasing order of their second field (which is named _2) and contains the count of number of words in their first field (named _1). Below is an example of how this is scripted in spark-shell:
// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _, 1) // 2nd arg configures one task (same as number of partitions)
.map(item => item.swap) // interchanges position of entries in each tuple
.sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
.map(item => item.swap)
In order to reverse the ordering of the sort use sortByKey(false,1) since its first arg is the boolean value of ascending. Its second argument is the number of tasks (equivilent to number of partitions) which is set to 1 for testing with a small input file where only one output data file is desired; reduceByKey also takes this optional argument.
After this the wordCounts RDD can be saved as text files to a directory with saveAsTextFile(directory_pathname) in which will be deposited one or more part-xxxxx files (starting with part-00000) depending on the number of reducers configured for the job (1 output data file per reducer), a _SUCCESS file depending on if the job succeeded or not and .crc files.
Using pyspark a python script very similar to the scala script shown above produces output that is effectively the same. Here is the pyspark version demonstrating sorting a collection by value:
file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
.map(lambda (a, b): (b, a)) \
.sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
.map(lambda (a, b): (b, a))
In order to sortbyKey in descending order its first arg should be 0. Since python captures leading and trailing whitespace as data, strip() is inserted before splitting each line on spaces, but this is not necessary using spark-shell/scala.
The main difference in the output of the spark and python version of wordCount is that where spark outputs (word,3) python outputs (u'word', 3).
For more information on spark RDD methods see http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html for python and https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD for scala.
In the spark-shell, running collect() on wordCounts transforms it from an RDD to an Array[(String, Int)] = Array[Tuple2(String,Int)] which itself can be sorted on the second field of each Tuple2 element using:
Array.sortBy(_._2)
sortBy also takes an optional implicit math.Ordering argument such as Romeo Kienzler showed in a previous answer to this question. Array.sortBy(_._2) will do a reverse sort of the Array Tuple2 elements on their _2 fields just by defining an implicit reverse ordering before running the map-reduce script because it overrides the pre-existing ordering of Int. The reverse int Ordering already defined by Romeo Kienzler is:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}
Another common way to define this reverse Ordering is to reverse the order of a and b and drop the (-1) on the right hand side of the compare definition:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = b.compare(a)
}
Doing it in more pythonic way.
# In descending order
''' The first parameter tells number of elements
to be present in output.
'''
data.takeOrdered(10, key=lambda x: -x[1])
# In Ascending order
data.takeOrdered(10, key=lambda x: x[1])
For those looking to get top N elements ordered by value:
theRDD.takeOrdered(N, lambda (key, value): -1 * len(value))
if you wish to order by string length.
On the other hand if the values are already in the form that is suitable for your desired ordering, then:
theRDD.takeOrdered(N, lambda (key, value): -1 * value)
would suffice.
you can do it this way
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}
counts.collect.toSeq.sortBy(_._2)
So basically you convert your RDD to a sequence and use the sort method in order to sort it.
The block above globally changes the sort behaviour in order to get a descending sort order.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With