For any given collection of Map
, for instance,
val in = Array( Map("a" -> 1, "b" -> 2),
Map("a" -> 11, "c" -> 4),
Map("b" -> 7, "c" -> 10))
how to use aggregate
on in.par
so as to merge the maps into
Map ( "a" -> 12, "b" -> 9, "c" -> 14 )
Note Map
merging has been asked multiple times, yet looking for a solution with aggregate
on parallel collections.
Many Thanks
Alternatively, we can use Stream#concat() function to merge the maps together. This function can combine two different streams into one. As shown in the snippet, we are passed the streams of map1 and map2 to the concate() function and then collected the stream of their combined entry elements.
Assuming that both maps contain the same set of keys, and that you want to "combine" the values, the thing you would be looking for is a Pair class, see here for example. You simply iterate one of the maps; and retrieve values from both maps; and create a Pair; and push that in your result map.
You can combine two maps in Java by using the putAll() method of java. util. Map interface. This method copies all the mappings from one Map to another, like, if you call it like first.
How about applying merge as both seqop
and comboop
?
val in = Array(
Map("a" -> 1, "b" -> 2),
Map("a" -> 11, "c" -> 4),
Map("b" -> 7, "c" -> 10)
)
def merge(m1: Map[String, Int], m2: Map[String, Int]): Map[String, Int] =
m1 ++ m2.map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }
in.par.aggregate(Map[String, Int]())(merge, merge)
Update
You pass to aggregate
initial accumulator value(empty map) and two closures - seqop
and comboop
.
Parallel sequence splits in several partitions to be processed in parallel. Each partition is processed by successively applying seqop
to accumulator and array element.
def seqop(
accumulator: Map[String, Int],
element: Map[String, Int]): Map[String, Int] = merge(accumulator, element)
seqop
takes initial accumulator value and first array element and merges it. Next it takes previous result and next array element and so on until whole partition is merged in one map.
When every partition is merged in a separate map, these maps should be combined by applying comboop
. comboop
takes merged map from first partition and merged map from second partition and merges it together. Next it takes previous result and map from third partition and so on until all is merged in one map. This is the result of aggregate
.
def comboop(
m1: Map[String, Int],
m2: Map[String, Int]): Map[String, Int] = merge(m1, m2)
It is just coincidence that seqop
and comboop
are the same. In general they differs in logic and signatures.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With