Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to clean up substreams in continuous Akka streams

Given I have a very long running stream of events flowing through something as show below. When a long time has passed there will be lots of sub streams created that is no longer needed.

Is there a way to clean up a specific substream at a given time, for example the substream created by id 3 should be cleaned and the state in the scan method lost at 13Pm (expires property of Wid)?

case class Wid(id: Int, v: String, expires: LocalDateTime)
test("Substream with scan") {
  val (pub, sub) = TestSource.probe[Wid]
    .groupBy(Int.MaxValue, _.id)
    .scan("")((a: String, b: Wid) => a + b.v)
    .mergeSubstreams
    .toMat(TestSink.probe[String])(Keep.both)
    .run()
}
like image 255
user3139545 Avatar asked May 17 '17 05:05

user3139545


People also ask

Which are the 3 main components in a Akka stream?

Akka streams consist of three major components in it – Source, Flow and Sink.

How does Akka backpressure work?

Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous. Non-Blocking.

What is materialized value in Akka stream?

The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.


1 Answers

TL;DR You can close a substream after some time. However, using input to dynamically set the time with built-in stages is another matter.

Closing a substream

To close a flow, you usually complete it (from upstream), but you can also cancel it (from downstream). For instance, the take(n: Int) flow will cancel once n elements have gone through.

Now, in the groupBy case, you cannot complete a substream, since upstream is shared by all substreams, but you can cancel it. How depends on what condition you want to put on its end.

However, be aware that groupBy removes inputs for subflows that have already been closed: If a new element with id 3 comes from upstream to the groupBy after the 3-substream has been closed, it will simply be ignored and the next element will be pulled in. The reason for this is probably that some elements might be lost in the process between closing and re-opening of the substream. Also, if your stream is supposed to run for a very long time, this will affect performances because each element will be checked against the list of closed substreams before being forwarded to the relevant (live) substream. You might want to implement your own stateful filter (say, with a bloom filter) if you're not satisfied with the performances of this.

To close a substream, I usually use either take (if you want only a given number of elements, but that's probably not the case on an infinite stream), or some kind of timeout: either completionTimeout if you want a fixed time from materialization to closure or idleTimeout if you want to close when no element have been coming through for some time. Note that these flows do not cancel the stream but fail it, so you have to catch the exception with a recover or recoverWith stage to change the failure into a cancel (recoverWith allows you to cancel without sending any last element, by recovering with Source.empty).

Dynamically set the timeout

Now what you want is to set dynamically the closing time according to the first passing element. This is more complicated because materialization of streams is independant of the elements that pass through them. Indeed, in the usual (without groupBy) case, streams are materialized before any element go through them, so it makes no sense to use elements to materialize them.

I had similar issues in that question, and ended up using a modified version of groupBy with signature

paramGroupBy[K, OO, MM](maxSubstreams: Int, f: Out => K, paramSubflow: K => Flow[Out, OO, MM])

that allows to define every substream using the key that defined it. This can be modified to have the first element (instead of the key), as parameter.

Another (probably simpler, in your case) way would be to write your own stage that does exactly what you want: get end-time from first element and cancel the stream at that time. Here is an example implementation for this (I used a scheduler instead of setting a state):

object CancelAfterTimer

class CancelAfter[T](getTimeout: T => FiniteDuration) extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("CancelAfter.in")
  val out = Outlet[T]("CancelAfter.in")
  override val shape: FlowShape[T, T] = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler  {
    override def onPush(): Unit = {
      val elem = grab(in)
      if (!isTimerActive(CancelAfterTimer))
        scheduleOnce(CancelAfterTimer, getTimeout(elem))
      push(out, elem)
    }

    override def onTimer(timerKey: Any): Unit = 
      completeStage() //this will cancel the upstream and close the downstrean

    override def onPull(): Unit = pull(in)

    setHandlers(in, out, this)
  }
}
like image 60
Cyrille Corpet Avatar answered Oct 17 '22 07:10

Cyrille Corpet