I am trying to use a divide-and-conquer (aka fork/join) approach for a number crunching problem. Here is the code:
import scala.actors.Futures.future
private def compute( input: Input ):Result = {
if( pairs.size < SIZE_LIMIT ) {
computeSequential()
} else {
val (input1,input2) = input.split
val f1 = future( compute(input1) )
val f2 = future( compute(input2) )
val result1 = f1()
val result2 = f2()
merge(result1,result2)
}
}
It runs (with a nice speed-up) but the the future apply method seems to block a thread and the thread pool increases tremendously. And when too many threads are created, the computations is stucked.
Is there a kind of react method for futures which releases the thread ? Or any other way to achieve that behavior ?
EDIT: I am using scala 2.8.0.final
Don't claim (apply) your Future
s, since this forces them to block and wait for an answer; as you've seen this can lead to deadlocks. Instead, use them monadically to tell them what to do when they complete. Instead of:
val result1 = f1()
val result2 = f2()
merge(result1,result2)
Try this:
for {
result1 <- f1
result2 <- f2
} yield merge(result1, result2)
The result of this will be a Responder[Result]
(essentially a Future[Result]
) containing the merged results; you can do something effectful with this final value using respond()
or foreach()
, or you can map()
or flatMap()
it to another Responder[T]
. No blocking necessary, just keep scheduling computations for the future!
Ok, the signature of the compute
function is going to have to change to Responder[Result]
now, so how does that affect the recursive calls? Let's try this:
private def compute( input: Input ):Responder[Result] = {
if( pairs.size < SIZE_LIMIT ) {
future(computeSequential())
} else {
val (input1,input2) = input.split
for {
result1 <- compute(input1)
result2 <- compute(input2)
} yield merge(result1, result2)
}
}
Now you no longer need to wrap the calls to compute
with future(...)
because they're already returning Responder
(a superclass of Future
).
One upshot of using this continuation-passing style is that your top-level code--whatever calls compute
originally--doesn't block at all any more. If it's being called from main()
, and that's all the program does, this will be a problem, because now it will just spawn a bunch of futures and then immediately shut down, having finished everything it was told to do. What you need to do is block
on all these futures, but only once, at the top level, and only on the results of all the computations, not any intermediate ones.
Unfortunately, this Responder
thing that's being returned by compute()
no longer has a blocking apply()
method like the Future
did. I'm not sure why flatMapping Future
s produces a generic Responder
instead of a Future
; this seems like an API mistake. But in any case, you should be able to make your own:
def claim[A](r:Responder[A]):A = {
import java.util.concurrent.ArrayBlockingQueue
import scala.actors.Actor.actor
val q = new ArrayBlockingQueue[A](1)
// uses of 'respond' need to be wrapped in an actor or future block
actor { r.respond(a => q.put(a)) }
return q.take
}
So now you can create a blocking call to compute in your main
method like so:
val finalResult = claim(compute(input))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With