Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala spark reduce by key and find common value

I have a file of csv data stored in as a sequenceFile on HDFS, in the format of name, zip, country, fav_food1, fav_food2, fav_food3, fav_colour. There could be many entries with the same name and I needed to find out what their favourite food was (ie count all the food entries in all the records with that name and return the most popular one. I am new to Scala and Spark and have gone thorough multiple tutorials and scoured the forums but am stuck as to how to proceed. So far I have got the sequence files which had Text into String format and then filtered out the entries

Here is the sample data entries one to a line in the file

Bob,123,USA,Pizza,Soda,,Blue
Bob,456,UK,Chocolate,Cheese,Soda,Green
Bob,12,USA,Chocolate,Pizza,Soda,Yellow
Mary,68,USA,Chips,Pasta,Chocolate,Blue

So the output should be the tuple (Bob, Soda) since soda appears the most amount of times in Bob's entries.

import org.apache.hadoop.io._

var lines  = sc.sequenceFile("path",classOf[LongWritable],classOf[Text]).values.map(x => x.toString())
// converted to string since I could not get filter to run on Text and removing the longwritable

var filtered = lines.filter(_.split(",")(0) == "Bob");
// removed entries with all other users

var f_tuples = filtered.map(line => lines.split(",");
// split all the values

var f_simple = filtered.map(line => (line(0), (line(3), line(4), line(5))
// removed unnecessary fields

This Issue I have now is that I think I have this [<name,[f,f,f]>] structure and don't really know how to proceed to flatten it out and get the most popular food. I need to combine all the entries so I have a entry with a and then get the most common element in the value. Any help would be appreciated. Thanks

I tried this to get it to flatten out, but it seems the more I try, the more convoluted the data structure becomes.

var f_trial = fpairs.groupBy(_._1).mapValues(_.map(_._2))
// the resulting structure was of type org.apache.spark.rdd.RDD[(String, Interable[(String, String, String)]

here is what a println of a record looks like after f_trial

("Bob", List((Pizza, Soda,), (Chocolate, Cheese, Soda), (Chocolate, Pizza, Soda)))

Parenthesis Breakdown

("Bob", 

List(

(Pizza, Soda, <missing value>),

(Chocolate, Cheese, Soda),

(Chocolate, Pizza, Soda)

) // ends List paren

) // ends first paren
like image 808
VSEWHGHP Avatar asked Dec 24 '22 15:12

VSEWHGHP


2 Answers

I found time. Setup:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf

    val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
    val sc = new SparkContext(conf)

    val data = """   
  Bob,123,USA,Pizza,Soda,,Blue
  Bob,456,UK,Chocolate,Cheese,Soda,Green
  Bob,12,USA,Chocolate,Pizza,Soda,Yellow
  Mary,68,USA,Chips,Pasta,Chocolate,Blue
  """.trim

    val records = sc.parallelize(data.split('\n'))

Extract the food choices, and for each make a tuple of ((name, food), 1)

    val r2 = records.flatMap { r =>
      val Array(name, id, country, food1, food2, food3, color) = r.split(',');
      List(((name, food1), 1), ((name, food2), 1), ((name, food3), 1))
    }

Total up each name/food combination:

    val r3 = r2.reduceByKey((x, y) => x + y)

Remap so that the name (only) is the key

    val r4 = r3.map { case ((name, food), total) => (name, (food, total)) }

Pick the food with the largest count at each step

    val res = r4.reduceByKey((x, y) => if (y._2 > x._2) y else x)

And we're done

    println(res.collect().mkString)
    //(Mary,(Chips,1))(Bob,(Soda,3))

EDIT: To collect all the food items that have the same top count for a person, we just change the last two lines:

Start with a List of items with total:

val r5 = r3.map { case ((name, food), total) => (name, (List(food), total)) }

In the equal case, concatenate the list of food items with that score

val res2 = r5.reduceByKey((x, y) => if (y._2 > x._2) y 
                                    else if (y._2 < x._2) x
                                    else (y._1:::x._1, y._2))

//(Mary,(List(Chocolate, Pasta, Chips),1))
//(Bob,(List(Soda),3))

If you want the top-3, say, then use aggregateByKey to assemble a list of the favorite foods per person instead of the second reduceByKey

like image 62
The Archetypal Paul Avatar answered Jan 06 '23 12:01

The Archetypal Paul


Solutions provided by Paul and mattinbits shuffle your data twice - once to perform reduce-by-name-and-food and once to reduce-by-name. It is possible to solve this problem with only one shuffle.

/**Generate key-food_count pairs from a splitted line**/
def bitsToKeyMapPair(xs: Array[String]): (String, Map[String, Long]) = {
  val key = xs(0)
  val map = xs
    .drop(3) // Drop name..country
    .take(3) // Take food
    .filter(_.trim.size !=0) // Ignore empty
    .map((_, 1L)) // Generate k-v pairs
    .toMap // Convert to Map
    .withDefaultValue(0L) // Set default

  (key, map)
}

/**Combine two count maps**/
def combine(m1: Map[String, Long], m2: Map[String, Long]): Map[String, Long] = {
  (m1.keys ++ m2.keys).map(k => (k, m1(k) + m2(k))).toMap.withDefaultValue(0L)
}

val n: Int = ??? // Number of favorite per user

val records = lines.map(line => bitsToKeyMapPair(line.split(",")))
records.reduceByKey(combine).mapValues(_.toSeq.sortBy(-_._2).take(n))

If you're not a purist you can replace scala.collection.immutable.Map with scala.collection.mutable.Map to further improve performance.

like image 42
zero323 Avatar answered Jan 06 '23 13:01

zero323