Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Reduce by key in "Scala" [Not In Spark]

I am trying to reduceByKeys in Scala, is there any method to reduce the values based on the keys in Scala. [ i know we can do by reduceByKey method in spark, but how do we do the same in Scala ? ]

The input Data is :

val File = Source.fromFile("C:/Users/svk12/git/data/retail_db/order_items/part-00000")
                 .getLines()
                 .toList

 val map = File.map(x => x.split(","))
               .map(x => (x(1),x(4)))

  map.take(10).foreach(println)

After Above Step i am getting the result as:

(2,250.0)
(2,129.99)
(4,49.98)
(4,299.95)
(4,150.0)
(4,199.92)
(5,299.98)
(5,299.95)

Expected Result :

(2,379.99)
(5,499.93)
.......
like image 364
Sai Mammahi Avatar asked Feb 09 '19 08:02

Sai Mammahi


2 Answers

Starting Scala 2.13, you can use the groupMapReduce method which is (as its name suggests) an equivalent of a groupBy followed by mapValues and a reduce step:

io.Source.fromFile("file.txt")
  .getLines.to(LazyList)
  .map(_.split(','))
  .groupMapReduce(_(1))(_(4).toDouble)(_ + _)

The groupMapReduce stage:

  • groups splited arrays by their 2nd element (_(1)) (group part of groupMapReduce)

  • maps each array occurrence within each group to its 4th element and cast it to Double (_(4).toDouble) (map part of groupMapReduce)

  • reduces values within each group (_ + _) by summing them (reduce part of groupMapReduce).

This is a one-pass version of what can be translated by:

seq.groupBy(_(1)).mapValues(_.map(_(4).toDouble).reduce(_ + _))

Also note the cast from Iterator to LazyList in order to use a collection which provides groupMapReduce (we don't use a Stream, since starting Scala 2.13, LazyList is the recommended replacement of Streams).

like image 69
Xavier Guihot Avatar answered Sep 20 '22 22:09

Xavier Guihot


It looks like you want the sum of some values from a file. One problem is that files are strings, so you have to cast the String to a number format before it can be summed.

These are the steps you might use.

io.Source.fromFile("so.txt") //open file
  .getLines()                //read line-by-line
  .map(_.split(","))         //each line is Array[String]
  .toSeq                     //to something that can groupBy()
  .groupBy(_(1))             //now is Map[String,Array[String]]
  .mapValues(_.map(_(4).toInt).sum) //now is Map[String,Int]
  .toSeq                     //un-Map it to (String,Int) tuples
  .sorted                    //presentation order
  .take(10)                  //sample
  .foreach(println)          //report

This will, of course, throw if any file data is not in the required format.

like image 42
jwvh Avatar answered Sep 17 '22 22:09

jwvh