Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to achieve sort by value in spark java

JavaPairRDD<String, Float> counts = ones
            .reduceByKey(new Function2<Float, Float, Float>() {
                @Override
                public Float call(Float i1, Float i2) {
                    return i1 + i2;
                }
            });

My output looks like this:

id,value
100002,23.47
100003,42.78
200003,50.45
190001,30.23

I would like the output to be sorted by value like:

200003,50.45
100003,42.78
190001,30.23
100002,23.47

How do I achieve this?

like image 250
Subramanyam S Avatar asked Mar 12 '15 06:03

Subramanyam S


2 Answers

Scala has a nice sortBy method. Could not find the Java equivalent, but this is the scala implementation:

  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.size)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
    this.keyBy[K](f)
        .sortByKey(ascending, numPartitions)
        .values

So, basically similar to the above, but it add a key instead of swapping forward and backwards. I use it like this: .sortBy(_._2) (sort by picking the second element of the tuple).

like image 185
Daniel Langdon Avatar answered Oct 03 '22 00:10

Daniel Langdon


I think there is no specific API to sort the data on value.

May be you need to do below steps:

1) Swap key and value
2) Use sortByKey API
3) Swap key and value

Look at the more details about sortByKey in beloe reference:
https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/api/java/JavaPairRDD.html#sortByKey%28boolean%29

for swap, we can use Scala Tuple API:

http://www.scala-lang.org/api/current/index.html#scala.Tuple2

For example, I have Java Pair RDD from the below function.

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
          @Override
          public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
          }
  });

Now, To swap key and value, you can use below code:

JavaPairRDD<Integer, String> swappedPair = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
           @Override
           public Tuple2<Integer, String> call(Tuple2<String, Integer> item) throws Exception {
               return item.swap();
           }

        });

Hope this helps. You need to take care of the data types.

like image 23
Ramana Avatar answered Oct 03 '22 00:10

Ramana