Suppose I need to run two concurrent computations, wait for both of them, and then combine their results. More specifically, I need to run f1: X1 => Y1
and f2: X2 => Y2
concurrently and then call f: (Y1, Y2) => Y
to finally get a value of Y
.
I can create future computations fut1: X1 => Future[Y1]
and fut2: X2 => Future[Y2]
and then compose them to get fut: (X1, X2) => Future[Y]
using monadic composition.
The problem is that monadic composition implies sequential wait. In our case it implies that we wait for one future first and then we will wait for another. For instance. if it takes 2 sec. to the first future to complete and just 1 sec. to the 2nd future to fail we waste 1 sec.
Thus it looks like we need an applicative composition of the futures to wait till either both complete or at least one future fails. Does it make sense ? How would you implement <*>
for futures ?
The simplest way to create a future object is to invoke the Future. apply method which starts an asynchronous computation and returns a future holding the result of that computation. The result becomes available once the future completes.
Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.
sequence takes a list of futures and transforms it into a single future of list in an asynchronous manner. For instance, assume that you have a list of independent jobs to be run simultaneously. In such a case, the list of futures can be composed into a single future of list using Future. sequence.
NOTE: With Future. onComplete() we are no longer blocking for the result from the Future but instead we will receive a callback for either a Success or a Failure.
None of the methods in other answers does the right thing in case of a future that fails quickly plus a future that succeeds after a long time.
But such a method can be implemented manually:
def smartSequence[A](futures: Seq[Future[A]]): Future[Seq[A]] = {
val counter = new AtomicInteger(futures.size)
val result = Promise[Seq[A]]()
def attemptComplete(t: Try[A]): Unit = {
val remaining = counter.decrementAndGet
t match {
// If one future fails, fail the result immediately
case Failure(cause) => result tryFailure cause
// If all futures have succeeded, complete successful result
case Success(_) if remaining == 0 =>
result tryCompleteWith Future.sequence(futures)
case _ =>
}
}
futures.foreach(_ onComplete attemptComplete)
result.future
}
ScalaZ does a similar thing internally, so both f1 |@| f2
and List(f1, f2).sequence
fail immediately after any of the futures fails.
Here is a quick test of the failing time for those methods:
import java.util.Date
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scalaz._, Scalaz._
object ReflectionTest extends App {
def f1: Future[Unit] = Future {
Thread.sleep(2000)
}
def f2: Future[Unit] = Future {
Thread.sleep(1000)
throw new RuntimeException("Failure")
}
def test(name: String)(
f: (Future[Unit], Future[Unit]) => Future[Unit]
): Unit = {
val start = new Date().getTime
f(f1, f2).andThen {
case _ =>
println(s"Test $name completed in ${new Date().getTime - start}")
}
Thread.sleep(2200)
}
test("monadic") { (f1, f2) => for (v1 <- f1; v2 <- f2) yield () }
test("zip") { (f1, f2) => (f1 zip f2).map(_ => ()) }
test("Future.sequence") {
(f1, f2) => Future.sequence(Seq(f1, f2)).map(_ => ())
}
test("smartSequence") { (f1, f2) => smartSequence(Seq(f1, f2)).map(_ => ())}
test("scalaz |@|") { (f1, f2) => (f1 |@| f2) { case _ => ()}}
test("scalaz sequence") { (f1, f2) => List(f1, f2).sequence.map(_ => ())}
Thread.sleep(30000)
}
And the result on my machine is:
Test monadic completed in 2281
Test zip completed in 2008
Test Future.sequence completed in 2007
Test smartSequence completed in 1005
Test scalaz |@| completed in 1003
Test scalaz sequence completed in 1005
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With