Could you guys explain how to use new groupBy
in akka-streams ? Documentation seems to be quite useless. groupBy
used to return (T, Source)
but not anymore. Here is my example (I mimicked one from docs):
Source(List(
1 -> "1a", 1 -> "1b", 1 -> "1c",
2 -> "2a", 2 -> "2b",
3 -> "3a", 3 -> "3b", 3 -> "3c",
4 -> "4a",
5 -> "5a", 5 -> "5b", 5 -> "5c",
6 -> "6a", 6 -> "6b",
7 -> "7a",
8 -> "8a", 8 -> "8b",
9 -> "9a", 9 -> "9b",
))
.groupBy(3, _._1)
.map { case (aid, raw) =>
aid -> List(raw)
}
.reduce[(Int, List[String])] { case (l: (Int, List[String]), r: (Int, List[String])) =>
(l._1, l._2 ::: r._2)
}
.mergeSubstreams
.runForeach { case (aid: Int, items: List[String]) =>
println(s"$aid - ${items.length}")
}
This simply hangs. Perhaps it hangs because number of substreams is lower than number of unique keys. But what should I do if I have infinite stream ? I'd like to group until key changes.
In my real stream data is always sorted by value I'm grouping by. Perhaps I don't need groupBy
at all ?
A year later, Akka Stream Contrib has a AccumulateWhileUnchanged class that does this:
libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"
and:
import akka.stream.contrib.AccumulateWhileUnchanged
source.via(new AccumulateWhileUnchanged(_._1))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With