Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly stop Akka streams from the outside

I'm designing a small tool that will generate CSV test data. I want to use Akka Streams (1.0-RC4) to implement the data flow. There will be a Source that generates random numbers, a transformation into CSV strings, some rate limiter and a Sink that writes into a file.

Also there should be a clean way of stopping the tool using a small REST interface.

This is where I'm struggling. After the stream has been started (RunnableFlow.run()) there seems to be no way of stopping it. Source and Sink are infinite (at least until disk runs full :)) so they will not stop the stream.

Adding control logic to Source or Sink feels wrong. Using ActorSystem.shutdown() too. What would be a good way of stopping the stream?

like image 839
ErosC Avatar asked Jul 13 '15 08:07

ErosC


1 Answers

Ok, so I found a decent solution. It was already sitting there under my nose, I just did not see it. Source.lazyEmpty materializes into a promise that when completed will terminate the Source and the stream behind it.

The remaining question is, how to include it into the infinite stream of random numbers. I tried Zip. The result was that no random numbers made it through the stream because lazyEmpty never emits values (doh). I tried Merge but the stream never terminated because Merge continues until all sources have completed.

So I wrote my own merge. It forwards all values from one of the input ports and terminates when any source completed.

object StopperFlow {

  private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) {
    val in = newInlet[A]("in")
    val stopper = newInlet[Unit]("stopper")

    override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init)
  }

  private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
    new StopperMergeShape(), Attributes.name("StopperMerge")) {
    import FlexiMerge._

    override def createMergeLogic(p: PortT) = new MergeLogic[In] {
      override def initialState =
        State[In](Read(p.in)) { (ctx, input, element) =>
          ctx.emit(element)
          SameState
        }

      override def initialCompletionHandling = eagerClose
    }
  }

  def apply[In](): Flow[In, In, Promise[Unit]] = {
    val stopperSource = Source.lazyEmpty[Unit]

    Flow(stopperSource) { implicit builder =>
      stopper =>
        val stopperMerge = builder.add(new StopperMerge[In]())

        stopper ~> stopperMerge.stopper

        (stopperMerge.in, stopperMerge.out)
    }
  }    
}

The flow can be plugged into any stream. When materialized it will return a Promise that terminates the stream on completion. Here's my test for it.

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val startTime = System.currentTimeMillis()

def dumpToConsole(f: Float) = {
  val timeSinceStart = System.currentTimeMillis() - startTime
  System.out.println(s"[$timeSinceStart] - Random number: $f")
}

val randomSource = Source(() => Iterator.continually(Random.nextFloat()))
val consoleSink = Sink.foreach(dumpToConsole)
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink)

val (_, promise) = flow.run()

Thread.sleep(1000)
val _ = promise.success(())
Thread.sleep(1000)

I hope this is useful for other too. Still leaves me puzzled why there is no built in way for terminating streams from outside of the stream.

like image 125
ErosC Avatar answered Oct 10 '22 04:10

ErosC