Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Sort records in groups?

I have a set of records which I need to:

1) Group by 'date', 'city' and 'kind'

2) Sort every group by 'prize

In my code:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Sort {

  case class Record(name:String, day: String, kind: String, city: String, prize:Int)

  val recs = Array (
      Record("n1", "d1", "k1", "c1", 10),
      Record("n1", "d1", "k1", "c1", 9),
      Record("n1", "d1", "k1", "c1", 8),
      Record("n2", "d2", "k2", "c2", 1),
      Record("n2", "d2", "k2", "c2", 2),
      Record("n2", "d2", "k2", "c2", 3)
      )

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("Test")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)
    val rs = sc.parallelize(recs)
    val rsGrp = rs.groupBy(r => (r.day, r.kind, r.city)).map(_._2)
    val x = rsGrp.map{r => 
      val lst = r.toList
      lst.map{e => (e.prize, e)}
      }
    x.sortByKey()
  }

}

When I try to sort group I get an error:

value sortByKey is not a member of org.apache.spark.rdd.RDD[List[(Int, 
 Sort.Record)]]

What is wrong? How to sort?

like image 427
zork Avatar asked Feb 16 '15 14:02

zork


1 Answers

You need define a Key and then mapValues to sort them.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

  object Sort {

    case class Record(name:String, day: String, kind: String, city: String, prize:Int)

    // Define your data

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf()
        .setAppName("Test")
        .setMaster("local")
        .set("spark.executor.memory", "2g")
      val sc = new SparkContext(conf)
      val rs = sc.parallelize(recs)

      // Generate pair RDD neccesary to call groupByKey and group it
      val key: RDD[((String, String, String), Iterable[Record])] = rs.keyBy(r => (r.day, r.city, r.kind)).groupByKey

      // Once grouped you need to sort values of each Key
      val values: RDD[((String, String, String), List[Record])] = key.mapValues(iter => iter.toList.sortBy(_.prize))

      // Print result
      values.collect.foreach(println)
    }
}
like image 142
gasparms Avatar answered Sep 24 '22 03:09

gasparms