Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to sum values and group them by a key value in Scala's List of Map?

Tags:

scala

I have a List of Map:

val list = List(
  Map("id" -> "A", "value" -> 20, "name" -> "a"),
  Map("id" -> "B", "value" -> 10, "name" -> "b"),
  Map("id" -> "A", "value" -> 5, "name" -> "a"),
  Map("id" -> "C", "value" -> 1, "name" -> "c"),
  Map("id" -> "D", "value" -> 60, "name" -> "d"),
  Map("id" -> "C", "value" -> 3, "name" -> "c")
)

I want to sum the value and group them by id value in the most efficient way so it becomes:

Map(A -> 25, B -> 10, C -> 4, D -> 60)
like image 659
null Avatar asked Jan 30 '15 11:01

null


4 Answers

A) This one is most readable and performant if you have many items with same id:

scala> list.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).sum)
res14: scala.collection.immutable.Map[Any,Int] = Map(D -> 60, A -> 25, C -> 4, B -> 10)

You can also use list.groupBy(_("id")).par... as well. It will work faster only if you have many elements with same key, otherwise it will be extrimely slow.

Otherwise, changing thread's context itself will make .par version slower, as map(_"value").sum (your nested map-reduce) may be faster than switching between threads. If N = count of cores in the system, your map-reduce should be N times slower to benefit from par, roughly speaking of course.

B) So, if parallelizing didn't work so well (it's better to check that with performance tests) you can just "reimplement" groupBy in specialized way:

val m = scala.collection.mutable.Map[String, Int]() withDefaultValue(0)
for (e <- list; k = e("id").toString) m.update(k, m(k) + e("value").asInstanceOf[Int])

C) The most parallelized option is:

val m = new scala.collection.concurrent.TrieMap[String, Int]()
for (e <- list.par; k = e("id").toString) {
    def replace = {           
       val v = m(k)
       m.replace(k, v, v + e("value").asInstanceOf[Int]) //atomic
    }
    m.putIfAbsent(k, 0) //atomic
    while(!replace){} //in case of conflict
}

scala> m
res42: scala.collection.concurrent.TrieMap[String,Int] = TrieMap(B -> 10, C -> 4, D -> 60, A -> 25)

D) The most parallelized in functional style (slower as merging maps every time, but best for distributed map-reduce without shared memory), using scalaz semigroups:

import scalaz._; import Scalaz._
scala> list.map(x => Map(x("id").asInstanceOf[String] -> x("value").asInstanceOf[Int]))
    .par.reduce(_ |+| _)
res3: scala.collection.immutable.Map[String,Int] = Map(C -> 4, D -> 60, A -> 25, B -> 10)

But it will be more performant only if you use some more complex aggregation than "+".


So let's do simple performance testing:

def time[T](n: Int)(f: => T) = {
  val start = System.currentTimeMillis()
  for(i <- 1 to n) f
  (System.currentTimeMillis() - start).toDouble / n
}

It's done in Scala 2.12 REPL with JDK8 on MacBook Pro 2.3 GHz Intel Core i7. Every test launched two times - first to warm-up the JVM.

1) For your input collection and time(100000){...}, from slowest to fastest:

`par.groupBy.par.mapValues` = 0.13861 ms
`groupBy.par.mapValues` = 0.07667 ms
`most parallelized` = 0.06184 ms    
`scalaz par.reduce(_ |+| _)` = 0.04010 ms //same for other reduce-based implementations, mentioned here
`groupBy.mapValues` = 0.00212 ms
`for` + `update` with mutable map initialization time = 0.00201 ms
`scalaz suml` = 0.00171 ms      
`foldLeft` from another answer = 0.00114 ms
`for` + `update` without mutable map initialization time = 0.00105

So, foldLeft from another answer seems to be the best solution for your input.

2) Let's make it bigger

 scala> val newlist = (1 to 1000).map(_ => list).reduce(_ ++ _)

Now with newList as input and time(1000){...}:

 `scalaz par.reduce(_ |+| _)` = 1.422 ms
 `foldLeft`/`for` = 0.418 ms
 `groupBy.par.mapValues` = 0.343 ms

And it's better to choose groupBy.par.mapValues here.

3) Finally, let's define another aggregation:

scala> implicit class RichInt(i: Int){ def ++ (i2: Int) = { Thread.sleep(1); i + i2}}
defined class RichInt

And test it with list and time(1000):

`foldLeft` = 7.742 ms
`most parallelized` = 3.315 ms

So it's better to use most parallelized version here.


Why reduce is so slow:

Let's take 8 elements. It produces a calculation tree from leafs [1] + ... + [1] to root [1 + ... + 1]:

time(([1] + [1]) + ([1] + [1]) + ([1] + [1]) + ([1] + [1]) 
   => ([1 +1] + [1 +1]) + ([1 + 1] + [1 + 1]) 
   => [1 + 1 + 1 + 1] + [1 + 1 + 1 + 1]) 
 = (1 + 1 + 1 + 1) +  (2 + 2) + 4 = 12

time(N = 8) = 8/2 + 2*8/4 + 4*8/8 = 8 * (1/2 + 2/4 + 4/8) = 8 * log2(8)/ 2 = 12

Or just:

time(N) = N * log2(N)/2

Of course this formula works only for numbers that are actually powers of 2. Anyway, the complexity is O(NlogN), which is slower than foldLeft's O(N). Even after parallelization it becomes just O(N) so this implementation can be used only for Big Data's distributed Map-Reduce, or simply saying when you have no enough memory and are storing your Map in some cache.

You may notice that it's parallelizing better than other options for your input - that's just because for 6 elements it's not so slow (almost O(1) here) - and you do only one reduce call - when other options are grouping data before or just creating more threads, which leads to more "thread switching" overhead. Simply saying, reduce creates less threads here. But if you have more data - it doesn't work of course (see experiment 2).

like image 155
dk14 Avatar answered Oct 12 '22 18:10

dk14


Also using foldLeft:

list.foldLeft(Map[String, Int]().withDefaultValue(0))((res, v) => {
  val key = v("id").toString
  res + (key -> (res(key) + v("value").asInstanceOf[Int]))
})

UPDATE: with reduceLeft:

(Map[String, Any]().withDefaultValue(0) :: list).reduceLeft((res, v) => {
  val key = v("id").toString
  res + (key -> (res(key).asInstanceOf[Int] + v("value").asInstanceOf[Int]))
})

By the way if you look at reduceLeft definition you'll see that it uses the same foldLeft:

  def reduceLeft[B >: A](f: (B, A) => B): B =
    if (isEmpty) throw new UnsupportedOperationException("empty.reduceLeft")
    else tail.foldLeft[B](head)(f)

UPDATE 2: with par and reduce: The problem here is to distinguish result Map value from initial Map value. I chose contains("id").

list.par.reduce((a, b) => {
  def toResultMap(m: Map[String, Any]) =
    if (m.contains("id"))
      Map(m("id").toString -> m("value")).withDefaultValue(0)
    else m
  val aM = toResultMap(a)
  val bM = toResultMap(b)
  aM.foldLeft(bM)((res, v) =>
    res + (v._1 -> (res(v._1).asInstanceOf[Int] + v._2.asInstanceOf[Int])))
})
like image 36
user5102379 Avatar answered Oct 12 '22 17:10

user5102379


I don't know about "most efficient", but the nicest way I can think of is using scalaz suml, which uses Monoid; the Monoid for Map does exactly what you want. The only ugly part is turning those Map[String, Any]s into something more well-typed and representing the structure we want (e.g. Map("A" → 20)).

import scalaz._, Scalaz._
list.map{m => 
  Map(m("id").asInstanceOf[String] → m("value").asInstanceOf[Int])
}.suml
like image 3
lmm Avatar answered Oct 12 '22 18:10

lmm


Starting Scala 2.13, you can use the groupMapReduce method which is (as its name suggests) an equivalent of a groupBy followed by mapValues and a reduce step:

// val list = List(Map("id" -> "A", "value" -> 20, "name" -> "a"), Map("id" -> "B", "value" -> 10, "name" -> "b"), Map("id" -> "A", "value" -> 5, "name" -> "a"), Map("id" -> "C", "value" -> 1, "name" -> "c"), Map("id" -> "D", "value" -> 60, "name" -> "d"), Map("id" -> "C", "value" -> 3, "name" -> "c"))
list.groupMapReduce(_("id"))(_("value").asInstanceOf[Int])(_ + _)
// Map("A" -> 25, "B" -> 10, "C" -> 4, "D" -> 60)

This:

  • groups Maps by their "id" field (_("id")) (group part of groupMapReduce)

  • maps each grouped Map to their "value" field typed back to Int (_("value").asInstanceOf[Int]) (map part of groupMapReduce)

  • reduces values within each group (_ + _) by summing them (reduce part of groupMapReduce).

This is a one-pass version of what can be translated by:

list.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).reduce(_ + _)).toMap
like image 1
Xavier Guihot Avatar answered Oct 12 '22 17:10

Xavier Guihot