Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka streams. Group by, aggregate for some time and emit result

I have an infinite streams of elements which I want to group by id and aggregate groups for let's say 2 seconds and then send them downstream. Here is a code which doesn't work, but could better explain what I want:

  Source
    .tick(0 second, 50 millis, () => if (Random.nextBoolean) (1, s"A") else (2, s"B"))
    .map { f => f() }
    .groupBy(10, _._1)
    // how to aggregate grouped elements here for two seconds?
    .scan(Seq[String]()) { (x, y) => x ++ Seq(y._2) }
    .to(Sink.foreach(println))

And desired output should look like this:

Seq(A, A, A, A, A)
Seq(B, B, B)
Seq(A, A)
Seq(B, B, B, B, B)
// and so on

How can I achieve such functionality with streams?

like image 876
Artem Malinko Avatar asked Mar 02 '17 08:03

Artem Malinko


1 Answers

You need groupedWithin in your flow :)

http://doc.akka.io/docs/akka/2.4.17/scala/stream/stages-overview.html#groupedwithin

like image 144
expert Avatar answered Oct 26 '22 15:10

expert