Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

React for futures

I am trying to use a divide-and-conquer (aka fork/join) approach for a number crunching problem. Here is the code:

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}

It runs (with a nice speed-up) but the the future apply method seems to block a thread and the thread pool increases tremendously. And when too many threads are created, the computations is stucked.

Is there a kind of react method for futures which releases the thread ? Or any other way to achieve that behavior ?

EDIT: I am using scala 2.8.0.final

like image 797
paradigmatic Avatar asked Oct 03 '10 21:10

paradigmatic


1 Answers

Don't claim (apply) your Futures, since this forces them to block and wait for an answer; as you've seen this can lead to deadlocks. Instead, use them monadically to tell them what to do when they complete. Instead of:

val result1 = f1()
val result2 = f2()
merge(result1,result2)

Try this:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)

The result of this will be a Responder[Result] (essentially a Future[Result]) containing the merged results; you can do something effectful with this final value using respond() or foreach(), or you can map() or flatMap() it to another Responder[T]. No blocking necessary, just keep scheduling computations for the future!

Edit 1:

Ok, the signature of the compute function is going to have to change to Responder[Result] now, so how does that affect the recursive calls? Let's try this:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}

Now you no longer need to wrap the calls to compute with future(...) because they're already returning Responder (a superclass of Future).

Edit 2:

One upshot of using this continuation-passing style is that your top-level code--whatever calls compute originally--doesn't block at all any more. If it's being called from main(), and that's all the program does, this will be a problem, because now it will just spawn a bunch of futures and then immediately shut down, having finished everything it was told to do. What you need to do is block on all these futures, but only once, at the top level, and only on the results of all the computations, not any intermediate ones.

Unfortunately, this Responder thing that's being returned by compute() no longer has a blocking apply() method like the Future did. I'm not sure why flatMapping Futures produces a generic Responder instead of a Future; this seems like an API mistake. But in any case, you should be able to make your own:

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}

So now you can create a blocking call to compute in your main method like so:

val finalResult = claim(compute(input))
like image 54
Tom Crockett Avatar answered Nov 07 '22 14:11

Tom Crockett