Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Ask with Timed Retry

Tags:

scala

akka

Here is a simple function that I wrote to do an Akka "ask" with a timed retry. There is an obvious race condition that I'm not sure how to solve.

def askWithRetry(actor: ActorRef, message: Any, timeout: Timeout): Future[Any] =
  (actor ? message)(timeout) recoverWith { case e: AskTimeoutException =>
      // do a retry. currently there is no retry limit for simplicity.
      askWithRetry(actor, message, timeout)
    }

Normally, this works. The "ask" or ? creates a temporary intermediate actor for each call. If the target sends a response message, the temporary "ask actor" puts the result in the Future as a successful completion. If the target doesn't respond in time, the future completes with a timeout exception and the recoverWith process does a retry.

However, there is a race condition. If the target sends the response message to the temporary "ask actor", but a timeout is processed before the response message, then the response message will be lost. The retry process resends a new request using a new temporary actor. Since the response message was sent to the previous temporary "ask actor" which is now defunct it will not be processed and is lost.

How can I fix this?

I can write a custom version of the Ask pattern with the retry logic built-in that fixes this race condition... I hate to use unnecessary custom code if there is a more standard option though.

UPDATE: Here is the custom version I ended up going with:

object AskWithRetry {
  def askWithRetry(context: ActorContext, actor: ActorRef, message: Any, retryInterval: Duration, maxRetries: Option[Int]): Future[Any] = {
    val p = Promise[Any]

    val intermediate = context.actorOf(props(p, actor, message, retryInterval, maxRetries))

    p.future
  }

  def props(promise: Promise[Any], target: ActorRef, message: Any, retryInterval: Duration, maxRetries: Option[Int]): Props =
          Props(new AskWithRetryIntermediateActor(promise, target, message, retryInterval, maxRetries))
}

class AskWithRetryIntermediateActor(promise: Promise[Any], target: ActorRef, message: Any, retryInterval: Duration, var maxRetries: Option[Int]) extends Actor {
  def doSend(): Unit = target ! message

  def receive: Receive = {
    case ReceiveTimeout =>
      maxRetries match {
        case None =>
          //println(s"Retrying. Infinite tries left. ${message}")
          doSend()
        case Some(retryCount) =>
          if (retryCount > 0) {
            //println(s"Retrying. ${retryCount-1} tries left. ${message}")
            maxRetries = Some(retryCount - 1)
            doSend()
          } else {
            //println(s"Exceeded timeout limit. Failing. ${message}")
            if (!promise.isCompleted) {
              promise.failure(new AskTimeoutException("retry limit reached"))
            }
            context.stop(self)
          }
      }
    case otherMessage: Any =>
      if (!promise.isCompleted) {
        //println(s"AskWithRetry: completing ${otherMessage}")
        promise.success(otherMessage)
      }
      context.stop(self)
  }

  context.setReceiveTimeout(retryInterval)
  doSend()
}
like image 787
user2684301 Avatar asked Oct 20 '22 13:10

user2684301


1 Answers

I think your instincts are good. If you want custom actor-logic, you should write it.

The custom ask-waiting actor should send the message to actor and scheduleOnce a message to itself to retry. That way, both responses and timeouts arrive via the receive method and you don't have any races.

like image 67
Rob Starling Avatar answered Oct 23 '22 15:10

Rob Starling