In scala, how would you write a function that takes a Sequence of Futures, runs them all, continually retries any that fail, and returns the results?
For example, the signature might be:
def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]
Bonus points for a configurable timeout at which point the function fails and the callee can handle that case.
Bonus bonus points if that error case handler can receive the list of Futures that failed.
Thanks!
Based on Retry a function that returns a Future consider
def retry[T](expr: => Future[T], n: Int = 3): Future[Either[Throwable, T]] = {
Future
.unit
.flatMap(_ => expr).map(v => Right(v))
.recoverWith {
case _ if n > 1 => retry(expr, n - 1)
case e => Future.failed(e).recover{case e => Left(e)}
}
}
in combination with
Future.sequence
which converts List[Future[T]]
to Future[List[T]]
. However sequence
has fail-fast behaviour, thus we had to lift our Future[T]
to Future[Either[Throwable, T]]
Putting these pieces together we can define
def waitRetryAll[T](futures: List[() => Future[T]]): Future[List[Either[Throwable, T]]] = {
Future.sequence(futures.map(f => retry(f.apply())))
}
and use it like so
val futures = List(
() => Future(42),
() => Future(throw new RuntimeException("boom 1")),
() => Future(11),
() => Future(throw new RuntimeException("boom 2"))
)
waitRetryAll(futures)
.andThen { case v => println(v) }
which outputs
Success(List(Right(42), Left(java.lang.RuntimeException: boom 1), Right(11), Left(java.lang.RuntimeException: boom 2)))
We can collect
our Left
s or Right
s and recover or continue processing accordingly, for example
waitRetryAll(futures)
.map(_.collect{ case v if v.isLeft => v })
...
Note how we had to pass in List[() => Future[T]]
instead of List[Future[T]]
in order to prevent the futures from eagerly kicking-off.
As far as I remember in standard library there are no utilities for Future
timeout.
How would you interrupt/cancel ongoing computation on JVM? In general case, you cannot, you can only interrupt Thread
when it's on wait
but if it never wait
s? IO libraries for async computation (which define cancelation) executes IO as series of smaller non-interruptible tasks (each map/flatMap creates a new step) and if they receive cancel/timeout then they will continue executing current task (as they cannot stop it) but they won't start the next one. You could return exception on timeout, but still the last step would be executed so if it was some side-effect (e.g. DB operation) it would be completed after you already returned failure.
This is non-intuitive and tricky and I think it was the reason why this behavior wasn't added to standard library.
Additionally, future is ongoing, potentially side-effecting operation. You cannot take a value of type Future[A]
and rerun it. You could however pass future as by-name parameter, so that in .recoverWith
you could create the future anew.
That being sad you could implement something like "retry until the LocalDateTime.now - startTime >= " because this is what I think you want:
def retry[A](future: => Future[A], attemptsLeft: Int, timeoutTime: Instant) =
future.recoverWith {
case error: Throwable =>
if (attemptsLeft <= 0 || Instant.now.isAfter(timeoutTime)) Future.failure(error)
else retryHelper(future, attemptsLeft - 1, timeoutTime)
}
That can be combined with Future.sequence
to create a list of results:
def retryFutures[A](list: List[() => Future[A]) = {
val attempts: Int = ...
val timeout: Instant = ...
Future.sequence(list.map(future => retry(future(), attempts, timeout)))
}
If you want to keep trace of which future failed and which succeeded:
def futureAttempt[A](future: Future[A]): Future[Either[Throwable, A]] =
future.map(a => Right(a))).recover {
case error: Throwable => Left(error)
}
def retryFutures[A](list: List[() => Future[A]) = {
val attempts: Int = ...
val timeout: Instant = ...
Future.sequence(list.map(future => retry(futureAttempt(future()), attempts, timeout)))
}
If you don't troubled with cancelling futures on JVM and if you have more cases like that I would suggest using a library.
If you want to use something that implements retry for you there is cats-retry
If you want to have something better that Future
at defining computation (e.g. something which won't require you to use by-name params or nullary functions) try out Monix or ZIO (https://zio.dev/)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With