I am not able to figure out how to stop akka stream Runnable Graph immediately ? How to use killswitch to achieve this? It has been just a few days that I started akka streams. In my case I am reading lines from a file and doing some operations in flow and writing to the sink. What I want to do is, stop reading file immediately whenever I want, and I hope this should possibly stop the whole running graph. Any ideas on this would be greatly appreciated.
Thanks in advance.
Since Akka Streams 2.4.3, there is an elegant way to stop the stream from the outside via KillSwitch
.
Consider the following example, which stops stream after 10 seconds.
object ExampleStopStream extends App {
implicit val system = ActorSystem("streams")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val source = Source.
fromIterator(() => Iterator.continually(Random.nextInt(100))).
delay(500.millis, DelayOverflowStrategy.dropHead)
val square = Flow[Int].map(x => x * x)
val sink = Sink.foreach(println)
val (killSwitch, done) =
source.via(square).
viaMat(KillSwitches.single)(Keep.right).
toMat(sink)(Keep.both).run()
system.scheduler.scheduleOnce(10.seconds) {
println("Shutting down...")
killSwitch.shutdown()
}
done.foreach { _ =>
println("I'm done")
Await.result(system.terminate(), 1.seconds)
}
}
The one way have a service or shutdownhookup which can call graph cancellable
val graph=
Source.tick(FiniteDuration(0,TimeUnit.SECONDS), FiniteDuration(1,TimeUnit.SECONDS), Random.nextInt).to(Sink.foreach(println))
val cancellable=graph.run()
cancellable.cancel
The cancellable.cancel can be part of ActorSystem.registerOnTermination
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With