Consider the following:
import scala.concurrent._
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.ExecutionContext.Implicits.global
def slowInt(i: Int) = { Thread.sleep(200); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(100); x + y }
def futures = (1 to 20).map(i => future(slowInt(i)))
def timeFuture(fn: => Future[_]) = {
val t0 = System.currentTimeMillis
Await.result(fn, Inf)
println((System.currentTimeMillis - t0) / 1000.0 + "s")
}
both of the following print ~2.5s:
// Use Future.reduce directly (Future.traverse is no different)
timeFuture { Future.reduce(futures)(slowAdd) }
// First wait for all results to come in, convert to Future[List], and then map the List[Int]
timeFuture { Future.sequence(futures).map(_.reduce(slowAdd)) }
As far as I can understand, the reason for this is that Future.reduce/traverse
is generic and therefore does not run faster with an associative operator, however, is there an easy way to define a computation where the folding/reducing would start as soon as at least 2 values are available (or 1 in the case of fold
), so that while some items in the list are still being generated, the already generated ones are already being computed on?
Scalaz has an implementation of futures that includes a chooseAny
combinator that takes a collection of futures and returns a future of a tuple of the first completed element and the rest of the futures:
def chooseAny[A](h: Future[A], t: Seq[Future[A]]): Future[(A, Seq[Future[A]])]
Twitter's implementation of futures calls this select
. The standard library doesn't include it (but see Viktor Klang's implementation pointed out by Som Snytt above). I'll use Scalaz's version here, but translation should be straightforward.
One approach to getting the operations to run as you wish is to pull two completed items off the list, push a future of their sum back on the list, and recurse (see this gist for a complete working example):
def collapse[A](fs: Seq[Future[A]])(implicit M: Monoid[A]): Future[A] =
Nondeterminism[Future].chooseAny(fs).fold(Future.now(M.zero))(
_.flatMap {
case (hv, tf) =>
Nondeterminism[Future].chooseAny(tf).fold(Future.now(hv))(
_.flatMap {
case (hv2, tf2) => collapse(Future(hv |+| hv2) +: tf2)
}
)
}
)
In your case you'd call something like this:
timeFuture(
collapse(futures)(
Monoid.instance[Int]((a, b) => slowAdd(a, b), 0)
)
)
This runs in just a touch over 1.6 seconds on my dual core laptop, so it's working as expected (and will continue to do what you want even if the time taken by slowInt
varies).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With