Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Proper usage of Futures in parallel calculations

Tags:

scala

I'm implementing an algorithm that can be easily parallelized but can't figure out how to create a proper number of futures and how to abort early. At the moment the outline of the code is along these lines

def solve: Boolean = {
  var result = false
  while(!result && i < iterations) {
    val futures = (1 to threads) map { _ => solveIter(geInitialValues()) }
    val loopResult = Future.fold(futures)(false)((acc, r) => acc || r )
    result = Await.result(loopResult, Duration.Inf)
    i+=1
  }
}

def solveIter(initialValues: Values): Future[Boolean] = Future {
  /* Takes a lot of time */
}

The obvious problem is explicitly set level of parallelism that may or may not be suitable for current execution context. If all futures are created at once how to make Future.fold to abort early?

like image 520
synapse Avatar asked Nov 12 '14 09:11

synapse


1 Answers

You can't cancel a Future, because Futures are read-only. But you can use a Promise which is "the write part from a Future".

Example code:

  • this code timeouts after 5 seconds because the futures are not done (solveIter never completes)
  • to complete a promise, remove comment which adds a completed promise to the 'promises' - the other futures will be cancel
  • remove 'promises.foreach(_.trySuccess(false))' and you get the timeout again, because the other futures doesn't getting cancel

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.Try


// create a bunch of promises
val promises = ((1 to 10) map { _ =>
  val p = Promise[Boolean]()
  p.completeWith(solveIter())
  p
}) // :+ Promise().success(true)
// ^^ REMOVE THIS COMMENT TO ADD A PROMISE WHICH COMPLETES

// get the futures from the promises
val futures = promises.map(_.future)

// loop over all futures
futures.foreach(oneFuture =>
  // register callback when future is done
  oneFuture.foreach{
    case true =>
      println("future with 'true' result found")

      // stop others
      promises.foreach(_.trySuccess(false))

    case _ => // future completes with false
  })



// wait at most 5 seconds till all futures are done
Try(Await.ready(Future.sequence(futures), 5.seconds)).recover { case _ =>
  println("TIMEOUT")
}


def solveIter(): Future[Boolean] = Future {
  /* Takes a VERY VERY VERY .... lot of time */
  Try(Await.ready(Promise().future, Duration.Inf))
  false
}
like image 151
j-keck Avatar answered Sep 29 '22 11:09

j-keck