In Akka, rather than using onComplete on a future response created with ?, I am trying to use pipeTo because that is supposedly the preferred pattern. However, I don't seem to be receiving any Throwables or Failures when the future times out. What should I expect to receive in my actor if a timeout occurs when using pipeTo? What about when a different exception is thrown? Example code:
class Simple(otherActor : ActorRef) extends Actor{
def receive = {
case "some_msg" => {
val implicit timeout = Timeout(1 seconds)
val response = otherActor ? "hello"
response pipeTo self
}
// case ??? // How do I handle timeouts?
}
}
If no message is automatically sent when a timeout occurs, how am I supposed to handle timeouts with pipeTo?
The failure of the future is sent as a akka.actor.Status.Failure
message containing the exception. The exception for timeout is akka.pattern.AskTimeoutException
.
If your example closely matches your actual code, then I'm not sure pipeTo
is what you want here. Piping the message back to yourself, to me, does not make too much sense and there are better solutions for a case of an actor sending a message to another actor and then waiting for a response. First though, let's talk about pipeTo
. I think a good example of when to use pipeTo
is if you had three actors, A, B, and C. A sends a message to B who in turn sends a message to C and that response from C should be returned to A after B does something else to it first. In that example, you could do something like this inside of B:
val fut = actorC ? someMessage
fut map(someMapFunc) pipeTo sender
Here, the pipeTo
function helps to prevent you from accidentally closing over the mutable sender
var if you were to instead use something like onComplete
and respond to the sender
inside of that callback.
Now, for your case, if you just want A to talk to B and then wait for a response from B (and handle potential timeouts), you could try something like this:
class ActorA extends Actor{
import context._
val myB = context.actorOf(Props[ActorB])
def receive = {
case msg =>
myB ! msg
setReceiveTimeout(2 seconds)
become(waitingForResponse)
}
def waitingForResponse:Receive = {
case ReceiveTimeout =>
println("got a receive timeout")
cancelReceiveTimeout
case response =>
println("got my response back")
cancelReceiveTimeout
}
def cancelReceiveTimeout = setReceiveTimeout(Duration.Undefined)
}
In this example, A starts with a default receive
partial function. When it receives a message, it sends another message to B, sets a receive timeout for receiving a response from B and then switches it's receive
function to something that is specific to waiting for that response from B. In that new receive function, I could either get my response from B in time or I could get a ReceiveTimeout
, indicating that I did not get my response in time. In either case, I cancel my receive timeout because it repeats.
Now this is very simplified, but I was just trying to show one way to do a back-and-forth between two actors, which it seems like your example was showing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With