I have a List of Map:
val list = List(
Map("id" -> "A", "value" -> 20, "name" -> "a"),
Map("id" -> "B", "value" -> 10, "name" -> "b"),
Map("id" -> "A", "value" -> 5, "name" -> "a"),
Map("id" -> "C", "value" -> 1, "name" -> "c"),
Map("id" -> "D", "value" -> 60, "name" -> "d"),
Map("id" -> "C", "value" -> 3, "name" -> "c")
)
I want to sum the value
and group them by id
value in the most efficient way so it becomes:
Map(A -> 25, B -> 10, C -> 4, D -> 60)
A) This one is most readable and performant if you have many items with same id:
scala> list.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).sum)
res14: scala.collection.immutable.Map[Any,Int] = Map(D -> 60, A -> 25, C -> 4, B -> 10)
You can also use list.groupBy(_("id")).par...
as well. It will work faster only if you have many elements with same key, otherwise it will be extrimely slow.
Otherwise, changing thread's context itself will make .par
version slower, as map(_"value").sum
(your nested map-reduce) may be faster than switching between threads. If N
= count of cores in the system, your map-reduce should be N
times slower to benefit from par
, roughly speaking of course.
B) So, if parallelizing didn't work so well (it's better to check that with performance tests) you can just "reimplement" groupBy
in specialized way:
val m = scala.collection.mutable.Map[String, Int]() withDefaultValue(0)
for (e <- list; k = e("id").toString) m.update(k, m(k) + e("value").asInstanceOf[Int])
C) The most parallelized option is:
val m = new scala.collection.concurrent.TrieMap[String, Int]()
for (e <- list.par; k = e("id").toString) {
def replace = {
val v = m(k)
m.replace(k, v, v + e("value").asInstanceOf[Int]) //atomic
}
m.putIfAbsent(k, 0) //atomic
while(!replace){} //in case of conflict
}
scala> m
res42: scala.collection.concurrent.TrieMap[String,Int] = TrieMap(B -> 10, C -> 4, D -> 60, A -> 25)
D) The most parallelized in functional style (slower as merging maps every time, but best for distributed map-reduce without shared memory), using scalaz semigroups:
import scalaz._; import Scalaz._
scala> list.map(x => Map(x("id").asInstanceOf[String] -> x("value").asInstanceOf[Int]))
.par.reduce(_ |+| _)
res3: scala.collection.immutable.Map[String,Int] = Map(C -> 4, D -> 60, A -> 25, B -> 10)
But it will be more performant only if you use some more complex aggregation than "+".
So let's do simple performance testing:
def time[T](n: Int)(f: => T) = {
val start = System.currentTimeMillis()
for(i <- 1 to n) f
(System.currentTimeMillis() - start).toDouble / n
}
It's done in Scala 2.12 REPL with JDK8 on MacBook Pro 2.3 GHz Intel Core i7. Every test launched two times - first to warm-up the JVM.
1) For your input collection and time(100000){...}
, from slowest to fastest:
`par.groupBy.par.mapValues` = 0.13861 ms
`groupBy.par.mapValues` = 0.07667 ms
`most parallelized` = 0.06184 ms
`scalaz par.reduce(_ |+| _)` = 0.04010 ms //same for other reduce-based implementations, mentioned here
`groupBy.mapValues` = 0.00212 ms
`for` + `update` with mutable map initialization time = 0.00201 ms
`scalaz suml` = 0.00171 ms
`foldLeft` from another answer = 0.00114 ms
`for` + `update` without mutable map initialization time = 0.00105
So, foldLeft
from another answer seems to be the best solution for your input.
2) Let's make it bigger
scala> val newlist = (1 to 1000).map(_ => list).reduce(_ ++ _)
Now with newList
as input and time(1000){...}
:
`scalaz par.reduce(_ |+| _)` = 1.422 ms
`foldLeft`/`for` = 0.418 ms
`groupBy.par.mapValues` = 0.343 ms
And it's better to choose groupBy.par.mapValues
here.
3) Finally, let's define another aggregation:
scala> implicit class RichInt(i: Int){ def ++ (i2: Int) = { Thread.sleep(1); i + i2}}
defined class RichInt
And test it with list
and time(1000)
:
`foldLeft` = 7.742 ms
`most parallelized` = 3.315 ms
So it's better to use most parallelized version here.
Why reduce is so slow:
Let's take 8 elements. It produces a calculation tree from leafs [1] + ... + [1]
to root [1 + ... + 1]
:
time(([1] + [1]) + ([1] + [1]) + ([1] + [1]) + ([1] + [1])
=> ([1 +1] + [1 +1]) + ([1 + 1] + [1 + 1])
=> [1 + 1 + 1 + 1] + [1 + 1 + 1 + 1])
= (1 + 1 + 1 + 1) + (2 + 2) + 4 = 12
time(N = 8) = 8/2 + 2*8/4 + 4*8/8 = 8 * (1/2 + 2/4 + 4/8) = 8 * log2(8)/ 2 = 12
Or just:
Of course this formula works only for numbers that are actually powers of 2. Anyway, the complexity is O(NlogN)
, which is slower than foldLeft
's O(N)
. Even after parallelization it becomes just O(N)
so this implementation can be used only for Big Data's distributed Map-Reduce, or simply saying when you have no enough memory and are storing your Map in some cache.
You may notice that it's parallelizing better than other options for your input - that's just because for 6 elements it's not so slow (almost O(1)
here) - and you do only one reduce call - when other options are grouping data before or just creating more threads, which leads to more "thread switching" overhead. Simply saying, reduce
creates less threads here. But if you have more data - it doesn't work of course (see experiment 2).
Also using foldLeft
:
list.foldLeft(Map[String, Int]().withDefaultValue(0))((res, v) => {
val key = v("id").toString
res + (key -> (res(key) + v("value").asInstanceOf[Int]))
})
UPDATE: with reduceLeft
:
(Map[String, Any]().withDefaultValue(0) :: list).reduceLeft((res, v) => {
val key = v("id").toString
res + (key -> (res(key).asInstanceOf[Int] + v("value").asInstanceOf[Int]))
})
By the way if you look at reduceLeft
definition you'll see that it uses the same foldLeft
:
def reduceLeft[B >: A](f: (B, A) => B): B =
if (isEmpty) throw new UnsupportedOperationException("empty.reduceLeft")
else tail.foldLeft[B](head)(f)
UPDATE 2: with par
and reduce
:
The problem here is to distinguish result Map value from initial Map value. I chose contains("id")
.
list.par.reduce((a, b) => {
def toResultMap(m: Map[String, Any]) =
if (m.contains("id"))
Map(m("id").toString -> m("value")).withDefaultValue(0)
else m
val aM = toResultMap(a)
val bM = toResultMap(b)
aM.foldLeft(bM)((res, v) =>
res + (v._1 -> (res(v._1).asInstanceOf[Int] + v._2.asInstanceOf[Int])))
})
I don't know about "most efficient", but the nicest way I can think of is using scalaz suml
, which uses Monoid
; the Monoid
for Map
does exactly what you want. The only ugly part is turning those Map[String, Any]
s into something more well-typed and representing the structure we want (e.g. Map("A" → 20)
).
import scalaz._, Scalaz._
list.map{m =>
Map(m("id").asInstanceOf[String] → m("value").asInstanceOf[Int])
}.suml
Starting Scala 2.13
, you can use the groupMapReduce
method which is (as its name suggests) an equivalent of a groupBy
followed by mapValues
and a reduce
step:
// val list = List(Map("id" -> "A", "value" -> 20, "name" -> "a"), Map("id" -> "B", "value" -> 10, "name" -> "b"), Map("id" -> "A", "value" -> 5, "name" -> "a"), Map("id" -> "C", "value" -> 1, "name" -> "c"), Map("id" -> "D", "value" -> 60, "name" -> "d"), Map("id" -> "C", "value" -> 3, "name" -> "c"))
list.groupMapReduce(_("id"))(_("value").asInstanceOf[Int])(_ + _)
// Map("A" -> 25, "B" -> 10, "C" -> 4, "D" -> 60)
This:
group
s Maps by their "id" field (_("id")
) (group part of groupMapReduce)
map
s each grouped Map to their "value" field typed back to Int (_("value").asInstanceOf[Int]
) (map part of groupMapReduce)
reduce
s values within each group (_ + _
) by summing them (reduce part of groupMapReduce).
This is a one-pass version of what can be translated by:
list.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).reduce(_ + _)).toMap
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With