I have to run multiple futures in parallel and the program shouldn't crash or hang.
For now I wait on futures one by one, and use fallback value if there is TimeoutException.
val future1 = // start future1
val future2 = // start future2
val future3 = // start future3
// <- at this point all 3 futures are running
// waits for maximum of timeout1 seconds
val res1 = toFallback(future1, timeout1, Map[String, Int]())
// .. timeout2 seconds
val res2 = toFallback(future2, timeout2, List[Int]())
// ... timeout3 seconds
val res3 = toFallback(future3, timeout3, Map[String, BigInt]())
def toFallback[T](f: Future[T], to: Int, default: T) = {
Try(Await.result(f, to seconds))
.recover { case to: TimeoutException => default }
}
As I can see, maximum wait time of this snippet is timeout1 + timeout2 + timeout3
My question is: how can I wait on all of those futures at once, so I can reduce wait time to max(timeout1, timeout2, timeout3)
?
EDIT: In the end I used modification of @Jatin and @senia answers:
private def composeWaitingFuture[T](fut: Future[T],
timeout: Int, default: T) =
future { Await.result(fut, timeout seconds) } recover {
case e: Exception => default
}
and later it's used as follows:
// starts futures immediately and waits for maximum of timeoutX seconds
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]())
val res2 = composeWaitingFuture(future2, timeout2, List[Int]())
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]())
// takes the maximum of max(timeout1, timeout2, timeout3) to complete
val combinedFuture =
for {
r1 <- res1
r2 <- res2
r3 <- res3
} yield (r1, r2, r3)
and later I use combinedFuture
as I see fit.
You could create future
that returns results of all 3 futures using flatMap
or for-comprehension:
val combinedFuture =
for {
r1 <- future1
r2 <- future2
r3 <- future3
} yield (r1, r2, r3)
val (r1, r2, r3) = Await.result(combinedFuture , Seq(timeout1, timeout2, timeout3).max)
If you are using akka
you could complete your future with default value after timeout:
implicit class FutureHelper[T](f: Future[T]) extends AnyVal{
import akka.pattern.after
def orDefault(t: Timeout, default: => T)(implicit system: ActorSystem): Future[T] = {
val delayed = after(t.duration, system.scheduler)(Future.successful(default))
Future firstCompletedOf Seq(f, delayed)
}
}
val combinedFuture =
for {
r1 <- future1.orDefault(timeout1, Map())
r2 <- future2.orDefault(timeout2, List())
r3 <- future3.orDefault(timeout3, Map())
} yield (r1, r2, r3)
val (r1, r2, r3) = Await.result(combinedFuture , allowance + Seq(timeout1, timeout2, timeout3).max)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With