Here is a simple function that I wrote to do an Akka "ask" with a timed retry. There is an obvious race condition that I'm not sure how to solve.
def askWithRetry(actor: ActorRef, message: Any, timeout: Timeout): Future[Any] =
(actor ? message)(timeout) recoverWith { case e: AskTimeoutException =>
// do a retry. currently there is no retry limit for simplicity.
askWithRetry(actor, message, timeout)
}
Normally, this works. The "ask" or ?
creates a temporary intermediate actor for each call. If the target sends a response message, the temporary "ask actor" puts the result in the Future as a successful completion. If the target doesn't respond in time, the future completes with a timeout exception and the recoverWith process does a retry.
However, there is a race condition. If the target sends the response message to the temporary "ask actor", but a timeout is processed before the response message, then the response message will be lost. The retry process resends a new request using a new temporary actor. Since the response message was sent to the previous temporary "ask actor" which is now defunct it will not be processed and is lost.
How can I fix this?
I can write a custom version of the Ask pattern with the retry logic built-in that fixes this race condition... I hate to use unnecessary custom code if there is a more standard option though.
UPDATE: Here is the custom version I ended up going with:
object AskWithRetry {
def askWithRetry(context: ActorContext, actor: ActorRef, message: Any, retryInterval: Duration, maxRetries: Option[Int]): Future[Any] = {
val p = Promise[Any]
val intermediate = context.actorOf(props(p, actor, message, retryInterval, maxRetries))
p.future
}
def props(promise: Promise[Any], target: ActorRef, message: Any, retryInterval: Duration, maxRetries: Option[Int]): Props =
Props(new AskWithRetryIntermediateActor(promise, target, message, retryInterval, maxRetries))
}
class AskWithRetryIntermediateActor(promise: Promise[Any], target: ActorRef, message: Any, retryInterval: Duration, var maxRetries: Option[Int]) extends Actor {
def doSend(): Unit = target ! message
def receive: Receive = {
case ReceiveTimeout =>
maxRetries match {
case None =>
//println(s"Retrying. Infinite tries left. ${message}")
doSend()
case Some(retryCount) =>
if (retryCount > 0) {
//println(s"Retrying. ${retryCount-1} tries left. ${message}")
maxRetries = Some(retryCount - 1)
doSend()
} else {
//println(s"Exceeded timeout limit. Failing. ${message}")
if (!promise.isCompleted) {
promise.failure(new AskTimeoutException("retry limit reached"))
}
context.stop(self)
}
}
case otherMessage: Any =>
if (!promise.isCompleted) {
//println(s"AskWithRetry: completing ${otherMessage}")
promise.success(otherMessage)
}
context.stop(self)
}
context.setReceiveTimeout(retryInterval)
doSend()
}
I think your instincts are good. If you want custom actor-logic, you should write it.
The custom ask-waiting actor should send the message to actor
and scheduleOnce
a message to itself to retry. That way, both responses and timeouts arrive via the receive
method and you don't have any races.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With