I have an infinite stream of events:
(timestamp, session_uid, traffic)
i.e.
...
(1448089943, session-1, 10)
(1448089944, session-1, 20)
(1448089945, session-2, 50)
(1448089946, session-1, 30)
(1448089947, session-2, 10)
(1448089948, session-3, 10)
...
These events I want to group by session_uid and calculate sum of traffic for each session.
I wrote an akka-streams
flow which works fine with finite stream use groupBy
(my code base on this example from cookbook). But with infinite stream it will not works because groupBy
function should process all incoming stream and only after that will be ready to return result.
I think I should implement grouping with timeout i.e. if I don't receive event with specified stream_uid more than 5 minutes from last I should return grouped events for this session_uid. But how to implement it use akka-streams
only?
I came up with a somewhat gnarly solution but I think it gets the job done.
The essential idea is to use the keepAlive
method of Source as the timer that will trigger completion.
But to do this we first have to abstract the data a bit. The timer will need to send the trigger or another tuple value from the original Source, therefore:
sealed trait Data
object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data
Then convert our Source of tuples to a Source of Values. We'll still use groupBy
to do groupings similar to your finite stream case:
val originalSource : Source[(Long, String, Int), Unit] = ???
type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid
val groupedDataSource : Source[IDGroup, Unit] =
originalSource.map(t => Value(t._1, t._2, t._3))
.groupBy(_.session_uid)
The tricky part is handling the groupings which are just tuples: (String, Source[Value,Unit])
. We need the timer to notify us if time has elapsed so we need another abstraction to know if we're still computing or we've completed computation due to a timeout:
sealed trait Sum {
val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum
val zeroSum : Sum = StillComputing(0)
Now we can drain the Source of each group. The keepAlive
will send a TimerTrigger
if the Source of Values doesn't produce something after the timeOut
. The Data
from the keepAlive is then pattern matched against either a TimerTrigger or a new Value from the original Source:
val evaluateSum : ((Sum , Data)) => Sum = {
case (runningSum, data) => {
data match {
case TimerTrigger => ComputedSum(runningSum.sum)
case v : Value => StillComputing(runningSum.sum + v.traffic)
}
}
}//end val evaluateSum
type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid
def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult =
idGroup._1 -> idGroup._2.keepAlive(timeOut, () => TimerTrigger)
.scan(zeroSum)(evaluateSum)
.collect {case c : ComputedSum => c.sum}
.runWith(Sink.head)
The collection is applied to a partial function that only matches a finished sum, therefore the Sink is only reached after the timer has fired.
We then apply this handler to each grouping that comes out:
val timeOut = FiniteDuration(5, MINUTES)
val sumSource : Source[SumResult, Unit] =
groupedDataSource map handleGroup(timeOut)
We now have a Source of (String,Future[Int])
which is the session_uid and a Future of the sum of traffic for that id.
Like I said, convoluted but meets the requirements. Also, I'm not entirely sure what happens if a uid that was already grouped and has been timed out, but then a new value with the same uid comes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With