Why is the exception in
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
object TestExceptionHandling {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()(defaultActorSystem)
Source(List(1, 2, 3)).map { i =>
if (i == 2) {
throw new RuntimeException("Please, don't swallow me!")
} else {
i
}
}.runForeach { i =>
println(s"Received $i")
}
}
}
silently ignored? I can see that the stream gets stopped after printing Received 1
, but nothing is logged. Note that the problem is not the logging configuration in general, as I see a lot of output if I set akka.log-config-on-start = on
in my application.conf
file.
Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness, and it is the defining feature of Akka Streams.
Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.
In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand. This protocol (backpressure) slows down the producers and ensures no data is lost.
Source[String,akka. NotUsed] = ... Sink: This is the exit point of your stream. There must be at least one in every stream.The Sink is the last element of our Stream . Basically it's a subscriber of the data sent/processed by a Source .
I'm now using a custom Supervision.Decider
that makes sure exceptions are properly logged, that can be set up like this:
val decider: Supervision.Decider = { e =>
logger.error("Unhandled exception in stream", e)
Supervision.Stop
}
implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
Also, as has been pointed out by Vikor Klang, in the example given above, the exception could also be "caught" via
Source(List(1, 2, 3)).map { i =>
if (i == 2) {
throw new RuntimeException("Please, don't swallow me!")
} else {
i
}
}.runForeach { i =>
println(s"Received $i")
}.onComplete {
case Success(_) =>
println("Done")
case Failure(e) =>
println(s"Failed with $e")
}
Note however, that this approach won't help you with
Source(List(1, 2, 3)).map { i =>
if (i == 2) {
throw new RuntimeException("Please, don't swallow me!")
} else {
i
}
}.to(Sink.foreach { i =>
println(s"Received $i")
}).run()
since run()
returns Unit
.
I had similar questions when I started using akk-streams. Supervision.Decider
helps but not always.
Unfortunately it doesn't catch exceptions thrown in ActionPublisher
. I see it handled, ActorPublisher.onError
is called but it doesn't reach Supervision.Decider
. It works with simple Stream provided in documentation.
Errors also don't reach actor if I use Sink.actorRef
.
And for the sake of experiment I tried following sample
val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))
In this case exception was caught by Decider but never reached actor subscriber.
Overall I think it's inconsistent behavior. I cannot use one mechanism for handling errors in Stream.
My original SO question: Custom Supervision.Decider doesn't catch exception produced by ActorPublisher
And here is akka issue where it's tracked: https://github.com/akka/akka/issues/18359
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With