Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I group items of sorted stream with SubFlows?

Could you guys explain how to use new groupBy in akka-streams ? Documentation seems to be quite useless. groupBy used to return (T, Source) but not anymore. Here is my example (I mimicked one from docs):

Source(List(
  1 -> "1a", 1 -> "1b", 1 -> "1c",
  2 -> "2a", 2 -> "2b",
  3 -> "3a", 3 -> "3b", 3 -> "3c",
  4 -> "4a",
  5 -> "5a", 5 -> "5b", 5 -> "5c",
  6 -> "6a", 6 -> "6b",
  7 -> "7a",
  8 -> "8a", 8 -> "8b",
  9 -> "9a", 9 -> "9b",
))
  .groupBy(3, _._1)
  .map { case (aid, raw) =>
    aid -> List(raw)
  }
  .reduce[(Int, List[String])] { case (l: (Int, List[String]), r: (Int, List[String])) =>
  (l._1, l._2 ::: r._2)
}
  .mergeSubstreams
  .runForeach { case (aid: Int, items: List[String]) =>
    println(s"$aid - ${items.length}")
  }

This simply hangs. Perhaps it hangs because number of substreams is lower than number of unique keys. But what should I do if I have infinite stream ? I'd like to group until key changes.

In my real stream data is always sorted by value I'm grouping by. Perhaps I don't need groupBy at all ?

like image 287
expert Avatar asked Dec 19 '22 05:12

expert


1 Answers

A year later, Akka Stream Contrib has a AccumulateWhileUnchanged class that does this:

libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"

and:

import akka.stream.contrib.AccumulateWhileUnchanged
source.via(new AccumulateWhileUnchanged(_._1))
like image 145
Yossi Avatar answered Jan 05 '23 15:01

Yossi