Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Proper way to stop Akka Streams on condition

I have been successfully using FileIO to stream the contents of a file, compute some transformations for each line and aggregate/reduce the results.

Now I have a pretty specific use case, where I would like to stop the stream when a condition is reached, so that it is not necessary to read the whole file but the process finishes as soon as possible. What is the recommended way to achieve this?

like image 672
jarandaf Avatar asked Jul 12 '16 09:07

jarandaf


People also ask

How does Akka backpressure work?

Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous.

How do Akka streams work?

Akka Streams components work with a demand-based protocol. In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.


1 Answers

If the stop condition is "on the outside of the stream"

There is a advanced building-block called KillSwitch that you could use to do this: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html The stream would get shut down once the kill switch is notified.

It has methods like abort(reason) / shutdown etc, see here for it's API: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

Reference documentation is here: http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala

Example usage would be:

val countingSrc = Source(Stream.from(1)).delay(1.second,
    DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]

val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both)
  .run()

doSomethingElse()

killSwitch.shutdown()

Await.result(last, 1.second) shouldBe 2

If the stop condition is inside the stream

You can use takeWhile to express any condition really, though sometimes take or limit may be also enough "take 10 lnes".

If your logic is very advanced, you could build a special stage that handles that special logic using statefulMapConcat that allows to express literally anything - so you could complete the stream whenever you want to "from the inside".

like image 117
Konrad 'ktoso' Malawski Avatar answered Sep 18 '22 18:09

Konrad 'ktoso' Malawski