Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Resolving Akka futures from ask in the event of a failure

Tags:

scala

akka

spray

I am calling an Actor using the ask pattern within a Spray application, and returning the result as the HTTP response. I map failures from the actor to a custom error code.

val authActor = context.actorOf(Props[AuthenticationActor])

callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user =>
  complete(StatusCodes.OK, user)
}

def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = {
 onComplete(f) {
  case Success(value: T) => cb(value)
  case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage)
  case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.")
  //In reality this returns a custom error object.
 }
}

This works correctly when the authActor sends a failure, but if the authActor throws an exception, nothing happens until the ask timeout completes. For example:

override def receive: Receive = {
  case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token")
}

I know that the Akka docs say that

To complete the future with an exception you need send a Failure message to the sender. This is not done automatically when an actor throws an exception while processing a message.

But given that I use asks for a lot of the interface between the Spray routing actors and the service actors, I would rather not wrap the receive part of every child actor with a try/catch. Is there a better way to achieve automatic handling of exceptions in child actors, and immediately resolve the future in the event of an exception?

Edit: this is my current solution. However, it's quite messy to do this for every child actor.

override def receive: Receive = {
case default =>
  try {
    default match {
      case _ => throw new ServiceException("")//Actual code would go here
    }
  }
  catch {
    case se: ServiceException =>
      logger.error("Service error raised:", se)
      sender ! Failure(se)
    case ex: Exception =>
      sender ! Failure(ex)
      throw ex
  }
}

That way if it's an expected error (i.e. ServiceException), it's handled by creating a failure. If it's unexpected, it returns a failure immediately so the future is resolved, but then throws the exception so it can still be handled by the SupervisorStrategy.

like image 723
James Tidman Avatar asked Apr 22 '15 10:04

James Tidman


1 Answers

If you want a way to provide automatic sending of a response back to the sender in case of an unexpected exception, then something like this could work for you:

trait FailurePropatingActor extends Actor{
  override def preRestart(reason:Throwable, message:Option[Any]){
    super.preRestart(reason, message)
    sender() ! Status.Failure(reason)
  }
}

We override preRestart and propagate the failure back to the sender as a Status.Failure which will cause an upstream Future to be failed. Also, it's important to call super.preRestart here as that's where child stopping happens. Using this in an actor looks something like this:

case class GetElement(list:List[Int], index:Int)
class MySimpleActor extends FailurePropatingActor {  
  def receive = {
    case GetElement(list, i) =>
      val result = list(i)
      sender() ! result
  }  
}

If I was to call an instance of this actor like so:

import akka.pattern.ask
import concurrent.duration._

val system = ActorSystem("test")
import system.dispatcher
implicit val timeout = Timeout(2 seconds)
val ref = system.actorOf(Props[MySimpleActor])
val fut = ref ? GetElement(List(1,2,3), 6)

fut onComplete{
  case util.Success(result) => 
    println(s"success: $result")

  case util.Failure(ex) => 
    println(s"FAIL: ${ex.getMessage}")
    ex.printStackTrace()    
}     

Then it would properly hit my Failure block. Now, the code in that base trait works well when Futures are not involved in the actor that is extending that trait, like the simple actor here. But if you use Futures then you need to be careful as exceptions that happen in the Future don't cause restarts in the actor and also, in preRestart, the call to sender() will not return the correct ref because the actor has already moved into the next message. An actor like this shows that issue:

class MyBadFutureUsingActor extends FailurePropatingActor{
  import context.dispatcher

  def receive = {
    case GetElement(list, i) => 
      val orig = sender()
      val fut = Future{
        val result = list(i)
        orig ! result
      }      
  } 
}

If we were to use this actor in the previous test code, we would always get a timeout in the failure situation. To mitigate that, you need to pipe the results of futures back to the sender like so:

class MyGoodFutureUsingActor extends FailurePropatingActor{
  import context.dispatcher
  import akka.pattern.pipe

  def receive = {
    case GetElement(list, i) => 
      val fut = Future{
        list(i)
      }

      fut pipeTo sender()
  } 
}

In this particular case, the actor itself is not restarted because it did not encounter an uncaught exception. Now, if your actor needed to do some additional processing after the future, you can pipe back to self and explicitly fail when you get a Status.Failure:

class MyGoodFutureUsingActor extends FailurePropatingActor{
  import context.dispatcher
  import akka.pattern.pipe

  def receive = {
    case GetElement(list, i) => 
      val fut = Future{
        list(i)
      }

      fut.to(self, sender())

    case d:Double =>
      sender() ! d * 2

    case Status.Failure(ex) =>
      throw ex
  } 
}

If that behavior becomes common, you can make it available to whatever actors need it like so:

trait StatusFailureHandling{ me:Actor =>
  def failureHandling:Receive = {
    case Status.Failure(ex) =>
      throw ex      
  }
}

class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{
  import context.dispatcher
  import akka.pattern.pipe

  def receive = myReceive orElse failureHandling

  def myReceive:Receive = {
    case GetElement(list, i) => 
      val fut = Future{
        list(i)
      }

      fut.to(self, sender())

    case d:Double =>
      sender() ! d * 2        
  } 
}  
like image 160
cmbaxter Avatar answered Oct 12 '22 11:10

cmbaxter