Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging Maps using `aggregate`

For any given collection of Map, for instance,

val in = Array( Map("a" -> 1,  "b" -> 2),
                Map("a" -> 11, "c" -> 4),
                Map("b" -> 7,  "c" -> 10))

how to use aggregate on in.par so as to merge the maps into

Map ( "a" -> 12, "b" -> 9, "c" -> 14 )

Note Map merging has been asked multiple times, yet looking for a solution with aggregate on parallel collections.

Many Thanks

like image 991
elm Avatar asked Aug 20 '14 08:08

elm


People also ask

How do I merge two maps together?

Alternatively, we can use Stream#concat() function to merge the maps together. This function can combine two different streams into one. As shown in the snippet, we are passed the streams of map1 and map2 to the concate() function and then collected the stream of their combined entry elements.

How can I combine two Hashmap objects containing the different types?

Assuming that both maps contain the same set of keys, and that you want to "combine" the values, the thing you would be looking for is a Pair class, see here for example. You simply iterate one of the maps; and retrieve values from both maps; and create a Pair; and push that in your result map.

How to combine map values in Java?

You can combine two maps in Java by using the putAll() method of java. util. Map interface. This method copies all the mappings from one Map to another, like, if you call it like first.


1 Answers

How about applying merge as both seqop and comboop?

val in = Array(
  Map("a" -> 1,  "b" -> 2),
  Map("a" -> 11, "c" -> 4),
  Map("b" -> 7,  "c" -> 10)
)

def merge(m1: Map[String, Int], m2: Map[String, Int]): Map[String, Int] =
  m1 ++ m2.map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }

in.par.aggregate(Map[String, Int]())(merge, merge)

Update

You pass to aggregate initial accumulator value(empty map) and two closures - seqop and comboop.

Parallel sequence splits in several partitions to be processed in parallel. Each partition is processed by successively applying seqop to accumulator and array element.

def seqop(
    accumulator: Map[String, Int], 
    element: Map[String, Int]): Map[String, Int] = merge(accumulator, element)

seqop takes initial accumulator value and first array element and merges it. Next it takes previous result and next array element and so on until whole partition is merged in one map.

When every partition is merged in a separate map, these maps should be combined by applying comboop. comboop takes merged map from first partition and merged map from second partition and merges it together. Next it takes previous result and map from third partition and so on until all is merged in one map. This is the result of aggregate.

def comboop(
    m1: Map[String, Int], 
    m2: Map[String, Int]): Map[String, Int] = merge(m1, m2)

It is just coincidence that seqop and comboop are the same. In general they differs in logic and signatures.

like image 51
lambdas Avatar answered Sep 21 '22 18:09

lambdas