Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Invoking a Future inside a receive method and stopping the actor after that

I'm creating a bunch of actors that do some work and then they are stopped. In some of these actors I'm invoking 3rd party APIs that return a Future.

MyActor extends Actor 
{
.... 

def receive = {

   case MsgA => {
    ... 
    //this evaluates to a Future
    val p : Future = someFutureAPICall()
    //stop the actor 
    context stop self 
    } 

}

}

Now since the Future is non-blocking the actor will be stopped right after that (?), even if the Future has not completed. What is the expected behavior in this case ?

For example, if I've an onComplete on the Future, will that ever be executed even if the actor has stopped ?

MyActor extends Actor 
{
.... 

def receive = {

   case MsgA => {
    ... 
    //this evaluates to a Future
    val p : Future = someFutureAPICall()

    p.onComplete {
      //will these expressions ever be evaluated ? 
      case Success(x) => log.info("Success")
      case Failure(f) => log.info("Failure") 
    }
    //stop the actor 
    context stop self 
    } 

}

}
like image 889
Soumya Simanta Avatar asked Jul 22 '14 04:07

Soumya Simanta


1 Answers

Yes the code that returns the Future (3rd party APIs) will execute immediately and return incomplete Future.

Execution of this Future to completeness has nothing to do with the actor that started it being alive.

If you don't need that actor anymore, you don't need to wait for the Future to complete and you can stop the actor like you do in the first example.

In case you need to do something in that actor with the result of that future you can install an onComplete callback on that Future. Once Future is complete it can send a message to the actor to stop. For example like this:

val myActor = self // Don't close over unstable actor reference by using self directly
p.onComplete {
  case Success(x) => myActor ! x; myActor ! akka.actor.PoisonPill // sends result to be processed and then stops actor
  case Failure(f) => myActor ! akka.actor.PoisonPill // stops actor
}

EDIT

Another alternative as suggested in comments is to use pipeTo usage pattern. It does almost the same thing. Here is how it's implemented in the Akka library:

def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
  future onComplete {
    case Success(r) ⇒ recipient ! r
    case Failure(f) ⇒ recipient ! Status.Failure(f)
  }
  future
}

Here is how you can call it after the Future is created:

p pipeTo myActor

The main difference that your actor will have to shutdown itself after receiving messages, and the failure is clearly communicated to the actor with Failure message. This method is a bit safer to use because you have to pass an ActorRef and you don't have to remember to copy it (self) into a variable.

like image 176
yǝsʞǝla Avatar answered Nov 19 '22 05:11

yǝsʞǝla