Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What message does pipeTo send on a timeout or other failure?

Tags:

akka

In Akka, rather than using onComplete on a future response created with ?, I am trying to use pipeTo because that is supposedly the preferred pattern. However, I don't seem to be receiving any Throwables or Failures when the future times out. What should I expect to receive in my actor if a timeout occurs when using pipeTo? What about when a different exception is thrown? Example code:

class Simple(otherActor : ActorRef) extends Actor{
  def receive = {
     case "some_msg" => {
       val implicit timeout = Timeout(1 seconds)
       val response = otherActor ? "hello" 
       response pipeTo self
     }

     // case ??? // How do I handle timeouts?
  }
}

If no message is automatically sent when a timeout occurs, how am I supposed to handle timeouts with pipeTo?

like image 686
mushroom Avatar asked Dec 19 '13 03:12

mushroom


2 Answers

The failure of the future is sent as a akka.actor.Status.Failure message containing the exception. The exception for timeout is akka.pattern.AskTimeoutException.

like image 161
Patrik Nordwall Avatar answered Oct 10 '22 19:10

Patrik Nordwall


If your example closely matches your actual code, then I'm not sure pipeTo is what you want here. Piping the message back to yourself, to me, does not make too much sense and there are better solutions for a case of an actor sending a message to another actor and then waiting for a response. First though, let's talk about pipeTo. I think a good example of when to use pipeTo is if you had three actors, A, B, and C. A sends a message to B who in turn sends a message to C and that response from C should be returned to A after B does something else to it first. In that example, you could do something like this inside of B:

val fut = actorC ? someMessage
fut map(someMapFunc) pipeTo sender

Here, the pipeTo function helps to prevent you from accidentally closing over the mutable sender var if you were to instead use something like onComplete and respond to the sender inside of that callback.

Now, for your case, if you just want A to talk to B and then wait for a response from B (and handle potential timeouts), you could try something like this:

class ActorA extends Actor{
  import context._
  val myB = context.actorOf(Props[ActorB])

  def receive = {
    case msg =>
      myB ! msg
      setReceiveTimeout(2 seconds)
      become(waitingForResponse)
  }

  def waitingForResponse:Receive = {
    case ReceiveTimeout =>
      println("got a receive timeout")
      cancelReceiveTimeout

    case response =>
      println("got my response back")
      cancelReceiveTimeout
  }

  def cancelReceiveTimeout = setReceiveTimeout(Duration.Undefined) 
}

In this example, A starts with a default receive partial function. When it receives a message, it sends another message to B, sets a receive timeout for receiving a response from B and then switches it's receive function to something that is specific to waiting for that response from B. In that new receive function, I could either get my response from B in time or I could get a ReceiveTimeout, indicating that I did not get my response in time. In either case, I cancel my receive timeout because it repeats.

Now this is very simplified, but I was just trying to show one way to do a back-and-forth between two actors, which it seems like your example was showing.

like image 33
cmbaxter Avatar answered Oct 10 '22 19:10

cmbaxter