Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to group incoming events from infinite stream?

I have an infinite stream of events:

(timestamp, session_uid, traffic)

i.e.

...
(1448089943, session-1, 10)
(1448089944, session-1, 20)
(1448089945, session-2, 50)
(1448089946, session-1, 30)
(1448089947, session-2, 10)
(1448089948, session-3, 10)
...

These events I want to group by session_uid and calculate sum of traffic for each session.

I wrote an akka-streams flow which works fine with finite stream use groupBy (my code base on this example from cookbook). But with infinite stream it will not works because groupBy function should process all incoming stream and only after that will be ready to return result.

I think I should implement grouping with timeout i.e. if I don't receive event with specified stream_uid more than 5 minutes from last I should return grouped events for this session_uid. But how to implement it use akka-streams only?

like image 484
Maxim Avatar asked Nov 21 '15 07:11

Maxim


1 Answers

I came up with a somewhat gnarly solution but I think it gets the job done.

The essential idea is to use the keepAlive method of Source as the timer that will trigger completion.

But to do this we first have to abstract the data a bit. The timer will need to send the trigger or another tuple value from the original Source, therefore:

sealed trait Data

object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data

Then convert our Source of tuples to a Source of Values. We'll still use groupBy to do groupings similar to your finite stream case:

val originalSource : Source[(Long, String, Int), Unit] = ???

type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid

val groupedDataSource : Source[IDGroup, Unit] = 
  originalSource.map(t => Value(t._1, t._2, t._3))
                .groupBy(_.session_uid)

The tricky part is handling the groupings which are just tuples: (String, Source[Value,Unit]). We need the timer to notify us if time has elapsed so we need another abstraction to know if we're still computing or we've completed computation due to a timeout:

sealed trait Sum {
  val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum

val zeroSum : Sum = StillComputing(0)

Now we can drain the Source of each group. The keepAlive will send a TimerTrigger if the Source of Values doesn't produce something after the timeOut. The Data from the keepAlive is then pattern matched against either a TimerTrigger or a new Value from the original Source:

val evaluateSum : ((Sum , Data)) => Sum = {
  case (runningSum, data) => { 
    data match {
      case TimerTrigger => ComputedSum(runningSum.sum)
      case v : Value    => StillComputing(runningSum.sum + v.traffic)
    }
  }
}//end val evaluateSum

type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid

def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult = 
  idGroup._1 -> idGroup._2.keepAlive(timeOut, () => TimerTrigger)
                          .scan(zeroSum)(evaluateSum)
                          .collect {case c : ComputedSum => c.sum}
                          .runWith(Sink.head)

The collection is applied to a partial function that only matches a finished sum, therefore the Sink is only reached after the timer has fired.

We then apply this handler to each grouping that comes out:

val timeOut = FiniteDuration(5, MINUTES)

val sumSource : Source[SumResult, Unit] = 
  groupedDataSource map handleGroup(timeOut)

We now have a Source of (String,Future[Int]) which is the session_uid and a Future of the sum of traffic for that id.

Like I said, convoluted but meets the requirements. Also, I'm not entirely sure what happens if a uid that was already grouped and has been timed out, but then a new value with the same uid comes.

like image 61
Ramón J Romero y Vigil Avatar answered Nov 15 '22 05:11

Ramón J Romero y Vigil