I was hoping code like follows would wait for both futures, but it does not.
object Fiddle { val f1 = Future { throw new Throwable("baaa") // emulating a future that bumped into an exception } val f2 = Future { Thread.sleep(3000L) // emulating a future that takes a bit longer to complete 2 } val lf = List(f1, f2) // in the general case, this would be a dynamically sized list val seq = Future.sequence(lf) seq.onComplete { _ => lf.foreach(f => println(f.isCompleted)) } } val a = FuturesSequence
I assumed seq.onComplete
would wait for them all to complete before completing itself, but not so; it results in:
true false
.sequence
was a bit hard to follow in the source of scala.concurrent.Future, I wonder how I would implement a parallel that waits for all original futures of a (dynamically sized) sequence, or what might be the problem here.
Edit: A related question: https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)
By default, futures and promises are non-blocking, making use of callbacks instead of typical blocking operations. To simplify the use of callbacks both syntactically and conceptually, Scala provides combinators such as flatMap , foreach , and filter used to compose futures in a non-blocking way.
Whenever we create a new Future operation, Scala spawns a new thread to run that Future's code, and after completion it executes any provided callbacks. Scala will infer that add has a return type of Future[Int] , and the enclosed code will execute in its own thread when the function is called.
An ExecutionContext can execute program logic asynchronously, typically but not necessarily on a thread pool. A general purpose ExecutionContext must be asynchronous in executing any Runnable that is passed into its execute -method.
yes, then it's a monad. @ElectricCoffee no. @PabloFernandez Scala's flatMap is Haskell's >>= , and Scala's for-comprehensions are equivalent to Haskell's do notation.
One common approach to waiting for all results (failed or not) is to "lift" failures into a new representation inside the future, so that all futures complete with some result (although they may complete with a result that represents failure). One natural way to get that is lifting to a Try
.
Twitter's implementation of futures provides a liftToTry
method that makes this trivial, but you can do something similar with the standard library's implementation:
import scala.util.{ Failure, Success, Try } val lifted: List[Future[Try[Int]]] = List(f1, f2).map( _.map(Success(_)).recover { case t => Failure(t) } )
Now Future.sequence(lifted)
will be completed when every future is completed, and will represent successes and failures using Try
.
And so, a generic solution for waiting on all original futures of a sequence of futures may look as follows, assuming an execution context is of course implicitly available.
import scala.util.{ Failure, Success, Try } private def lift[T](futures: Seq[Future[T]]) = futures.map(_.map { Success(_) }.recover { case t => Failure(t) }) def waitAll[T](futures: Seq[Future[T]]) = Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used waitAll(SeqOfFutures).map { // do whatever with the completed futures }
A Future
produced by Future.sequence
completes when either:
The second point is what's happening in your case, and it makes sense to complete as soon as one of the wrapped Future
has failed, because the wrapping Future
can only hold a single Throwable
in the failure case. There's no point in waiting for the other futures because the result will be the same failure.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With