Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fold/reduce over List of Futures with associative & commutative operator

Consider the following:

import scala.concurrent._
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.ExecutionContext.Implicits.global

def slowInt(i: Int) = { Thread.sleep(200); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(100); x + y }
def futures = (1 to 20).map(i => future(slowInt(i)))

def timeFuture(fn: => Future[_]) = {
  val t0 = System.currentTimeMillis
  Await.result(fn, Inf)
  println((System.currentTimeMillis - t0) / 1000.0 + "s")
}

both of the following print ~2.5s:

// Use Future.reduce directly (Future.traverse is no different)
timeFuture { Future.reduce(futures)(slowAdd) }

// First wait for all results to come in, convert to Future[List], and then map the List[Int]
timeFuture { Future.sequence(futures).map(_.reduce(slowAdd)) }

As far as I can understand, the reason for this is that Future.reduce/traverse is generic and therefore does not run faster with an associative operator, however, is there an easy way to define a computation where the folding/reducing would start as soon as at least 2 values are available (or 1 in the case of fold), so that while some items in the list are still being generated, the already generated ones are already being computed on?

like image 978
Erik Kaplun Avatar asked Mar 30 '14 04:03

Erik Kaplun


1 Answers

Scalaz has an implementation of futures that includes a chooseAny combinator that takes a collection of futures and returns a future of a tuple of the first completed element and the rest of the futures:

def chooseAny[A](h: Future[A], t: Seq[Future[A]]): Future[(A, Seq[Future[A]])]

Twitter's implementation of futures calls this select. The standard library doesn't include it (but see Viktor Klang's implementation pointed out by Som Snytt above). I'll use Scalaz's version here, but translation should be straightforward.

One approach to getting the operations to run as you wish is to pull two completed items off the list, push a future of their sum back on the list, and recurse (see this gist for a complete working example):

def collapse[A](fs: Seq[Future[A]])(implicit M: Monoid[A]): Future[A] =
  Nondeterminism[Future].chooseAny(fs).fold(Future.now(M.zero))(
    _.flatMap {
      case (hv, tf) =>
        Nondeterminism[Future].chooseAny(tf).fold(Future.now(hv))(
          _.flatMap {
            case (hv2, tf2) => collapse(Future(hv |+| hv2) +: tf2)
          }
        )
    }
  )

In your case you'd call something like this:

timeFuture(
  collapse(futures)(
    Monoid.instance[Int]((a, b) => slowAdd(a, b), 0)
  )
)

This runs in just a touch over 1.6 seconds on my dual core laptop, so it's working as expected (and will continue to do what you want even if the time taken by slowInt varies).

like image 189
Travis Brown Avatar answered Sep 18 '22 13:09

Travis Brown