I'm implementing an algorithm that can be easily parallelized but can't figure out how to create a proper number of futures and how to abort early. At the moment the outline of the code is along these lines
def solve: Boolean = {
var result = false
while(!result && i < iterations) {
val futures = (1 to threads) map { _ => solveIter(geInitialValues()) }
val loopResult = Future.fold(futures)(false)((acc, r) => acc || r )
result = Await.result(loopResult, Duration.Inf)
i+=1
}
}
def solveIter(initialValues: Values): Future[Boolean] = Future {
/* Takes a lot of time */
}
The obvious problem is explicitly set level of parallelism that may or may not be suitable for current execution context. If all futures are created at once how to make Future.fold
to abort early?
You can't cancel a Future, because Futures are read-only. But you can use a Promise which is "the write part from a Future".
Example code:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.Try
// create a bunch of promises
val promises = ((1 to 10) map { _ =>
val p = Promise[Boolean]()
p.completeWith(solveIter())
p
}) // :+ Promise().success(true)
// ^^ REMOVE THIS COMMENT TO ADD A PROMISE WHICH COMPLETES
// get the futures from the promises
val futures = promises.map(_.future)
// loop over all futures
futures.foreach(oneFuture =>
// register callback when future is done
oneFuture.foreach{
case true =>
println("future with 'true' result found")
// stop others
promises.foreach(_.trySuccess(false))
case _ => // future completes with false
})
// wait at most 5 seconds till all futures are done
Try(Await.ready(Future.sequence(futures), 5.seconds)).recover { case _ =>
println("TIMEOUT")
}
def solveIter(): Future[Boolean] = Future {
/* Takes a VERY VERY VERY .... lot of time */
Try(Await.ready(Promise().future, Duration.Inf))
false
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With