Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is Akka Streams swallowing my exceptions?

Why is the exception in

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source

object TestExceptionHandling {
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem()
    implicit val materializer = ActorMaterializer()(defaultActorSystem)

    Source(List(1, 2, 3)).map { i =>
      if (i == 2) {
        throw new RuntimeException("Please, don't swallow me!")
      } else {
        i
      }
    }.runForeach { i =>
      println(s"Received $i")
    }
  }
}

silently ignored? I can see that the stream gets stopped after printing Received 1, but nothing is logged. Note that the problem is not the logging configuration in general, as I see a lot of output if I set akka.log-config-on-start = on in my application.conf file.

like image 726
Matthias Langer Avatar asked Feb 25 '16 15:02

Matthias Langer


People also ask

What is Akka streams used for?

Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness, and it is the defining feature of Akka Streams.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.

What is backpressure in Akka?

In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand. This protocol (backpressure) slows down the producers and ensures no data is lost.

What is sink in Akka?

Source[String,akka. NotUsed] = ... Sink: This is the exit point of your stream. There must be at least one in every stream.The Sink is the last element of our Stream . Basically it's a subscriber of the data sent/processed by a Source .


2 Answers

I'm now using a custom Supervision.Decider that makes sure exceptions are properly logged, that can be set up like this:

val decider: Supervision.Decider = { e =>
  logger.error("Unhandled exception in stream", e)
  Supervision.Stop
}

implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)

Also, as has been pointed out by Vikor Klang, in the example given above, the exception could also be "caught" via

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.runForeach { i =>
  println(s"Received $i")
}.onComplete {
  case Success(_) =>
    println("Done")
  case Failure(e) =>
    println(s"Failed with $e")
}

Note however, that this approach won't help you with

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.to(Sink.foreach { i =>
  println(s"Received $i")
}).run()

since run() returns Unit.

like image 118
Matthias Langer Avatar answered Oct 10 '22 20:10

Matthias Langer


I had similar questions when I started using akk-streams. Supervision.Decider helps but not always.

Unfortunately it doesn't catch exceptions thrown in ActionPublisher. I see it handled, ActorPublisher.onError is called but it doesn't reach Supervision.Decider. It works with simple Stream provided in documentation.

Errors also don't reach actor if I use Sink.actorRef.

And for the sake of experiment I tried following sample

val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))

In this case exception was caught by Decider but never reached actor subscriber.

Overall I think it's inconsistent behavior. I cannot use one mechanism for handling errors in Stream.

My original SO question: Custom Supervision.Decider doesn't catch exception produced by ActorPublisher

And here is akka issue where it's tracked: https://github.com/akka/akka/issues/18359

like image 23
expert Avatar answered Oct 10 '22 20:10

expert