Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala retry sequence of futures until they all complete

In scala, how would you write a function that takes a Sequence of Futures, runs them all, continually retries any that fail, and returns the results?

For example, the signature might be:

  def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]

Bonus points for a configurable timeout at which point the function fails and the callee can handle that case.
Bonus bonus points if that error case handler can receive the list of Futures that failed.

Thanks!

like image 738
Jethro Avatar asked Mar 02 '23 23:03

Jethro


2 Answers

Based on Retry a function that returns a Future consider

def retry[T](expr: => Future[T], n: Int = 3): Future[Either[Throwable, T]] = {
  Future
    .unit
    .flatMap(_ => expr).map(v => Right(v))
    .recoverWith {
      case _ if n > 1 => retry(expr, n - 1)
      case e => Future.failed(e).recover{case e => Left(e)}
    }
}

in combination with

Future.sequence

which converts List[Future[T]] to Future[List[T]]. However sequence has fail-fast behaviour, thus we had to lift our Future[T] to Future[Either[Throwable, T]]

Putting these pieces together we can define

def waitRetryAll[T](futures: List[() => Future[T]]): Future[List[Either[Throwable, T]]] = {
  Future.sequence(futures.map(f => retry(f.apply())))
}

and use it like so

val futures = List(
  () => Future(42),
  () => Future(throw new RuntimeException("boom 1")),
  () => Future(11),
  () => Future(throw new RuntimeException("boom 2"))
)

waitRetryAll(futures)
  .andThen { case v => println(v) }

which outputs

Success(List(Right(42), Left(java.lang.RuntimeException: boom 1), Right(11), Left(java.lang.RuntimeException: boom 2)))

We can collect our Lefts or Rights and recover or continue processing accordingly, for example

waitRetryAll(futures)
  .map(_.collect{ case v if v.isLeft => v })
  ...

Note how we had to pass in List[() => Future[T]] instead of List[Future[T]] in order to prevent the futures from eagerly kicking-off.

like image 77
Mario Galic Avatar answered Mar 15 '23 13:03

Mario Galic


As far as I remember in standard library there are no utilities for Future timeout.

How would you interrupt/cancel ongoing computation on JVM? In general case, you cannot, you can only interrupt Thread when it's on wait but if it never waits? IO libraries for async computation (which define cancelation) executes IO as series of smaller non-interruptible tasks (each map/flatMap creates a new step) and if they receive cancel/timeout then they will continue executing current task (as they cannot stop it) but they won't start the next one. You could return exception on timeout, but still the last step would be executed so if it was some side-effect (e.g. DB operation) it would be completed after you already returned failure.

This is non-intuitive and tricky and I think it was the reason why this behavior wasn't added to standard library.

Additionally, future is ongoing, potentially side-effecting operation. You cannot take a value of type Future[A] and rerun it. You could however pass future as by-name parameter, so that in .recoverWith you could create the future anew.

That being sad you could implement something like "retry until the LocalDateTime.now - startTime >= " because this is what I think you want:

def retry[A](future: => Future[A], attemptsLeft: Int, timeoutTime: Instant) =
  future.recoverWith {
    case error: Throwable =>
      if (attemptsLeft <= 0 || Instant.now.isAfter(timeoutTime)) Future.failure(error)
      else retryHelper(future, attemptsLeft - 1, timeoutTime)
  }

That can be combined with Future.sequence to create a list of results:

def retryFutures[A](list: List[() => Future[A]) = {
  val attempts: Int = ...
  val timeout: Instant = ...
  Future.sequence(list.map(future => retry(future(), attempts, timeout)))
}

If you want to keep trace of which future failed and which succeeded:

def futureAttempt[A](future: Future[A]): Future[Either[Throwable, A]] =
  future.map(a => Right(a))).recover {
    case error: Throwable => Left(error)
  }

def retryFutures[A](list: List[() => Future[A]) = {
  val attempts: Int = ...
  val timeout: Instant = ...
  Future.sequence(list.map(future => retry(futureAttempt(future()), attempts, timeout)))
}

If you don't troubled with cancelling futures on JVM and if you have more cases like that I would suggest using a library.

If you want to use something that implements retry for you there is cats-retry

If you want to have something better that Future at defining computation (e.g. something which won't require you to use by-name params or nullary functions) try out Monix or ZIO (https://zio.dev/)

like image 43
Mateusz Kubuszok Avatar answered Mar 15 '23 13:03

Mateusz Kubuszok