This is using Scala 2.8 Actors. I have a long running job which can be parallelized. It consists of about 650,000 units of work. I divide it into 2600 different separate subtasks, and for each of these I create a new actor:
actor {
val range = (0L to total by limit)
val latch = new CountDownLatch(range.length)
range.foreach { offset =>
actor {
doExpensiveStuff(offset,limit)
latch.countDown
}
}
latch.await
}
This works fairly well, but overall takes 2+h to complete. The issue is that in the meanwhile, any other actors I create to do normal tasks seem to be starved out by the initial 2600 actors which are also patiently awaiting their time to be run on a thread but have been waiting longer than any new actors that come along.
How might I go about avoiding this starvation?
Initial thoughts:
UPDATE
Some folks have questioned the use of Actors at all, especially since the message passing capability was not being used within the workers. I had assumed that the Actor was a very lightweight abstraction around a ThreadPool at or near the same performance level of simply coding the ThreadPool-based execution manually. So I wrote a little benchmark:
import testing._
import java.util.concurrent._
import actors.Futures._
val count = 100000
val poolSize = 4
val numRuns = 100
val ActorTest = new Benchmark {
def run = {
(1 to count).map(i => future {
i * i
}).foreach(_())
}
}
val ThreadPoolTest = new Benchmark {
def run = {
val queue = new LinkedBlockingQueue[Runnable]
val pool = new ThreadPoolExecutor(
poolSize, poolSize, 1, TimeUnit.SECONDS, queue)
val latch = new CountDownLatch(count)
(1 to count).map(i => pool.execute(new Runnable {
override def run = {
i * i
latch.countDown
}
}))
latch.await
}
}
List(ActorTest,ThreadPoolTest).map { b =>
b.runBenchmark(numRuns).sum.toDouble / numRuns
}
// List[Double] = List(545.45, 44.35)
I used the Future abstraction in the ActorTest to avoid passing a message back to another actor to signal work was done. I was surprised to find that my Actor code was over 10 times slower. Note that I also created my ThreadPoolExecutor with an initial pool size with which the default Actor pool is created.
Looking back, it seems like I've possibly overused the Actor abstraction. I'm going to look into using separate ThreadPools for these distinct, expensive, long-running tasks.
No matter how many actors you have, if you're not configuring your scheduling explicitly, all of them are backed with a single fork/join scheduler (running against a thread pool with capacity 4, if I'm not mistaken). That's where starvation comes from.
From the comments to default Actor implementation:
The run-time system can be configured to use a larger thread pool size (for example, by setting the
actors.corePoolSize
JVM property). Thescheduler
method of theActor
trait can be overridden to return aResizableThreadPoolScheduler
, which resizes its thread pool to avoid starvation caused by actors that invoke arbitrary blocking methods. Theactors.enableForkJoin
JVM property can be set tofalse
, in which case aResizableThreadPoolScheduler
is used by default to execute actors.
In addition: an interesting thread on schedulers at scala-lang.
It seems from your example that you don't actually need to use actors at all, as you are not passing messages to your work units, or replying, or even looping.
Why not just create a load of Future
s and then wait on them finishing? That way, the underlying Fork Join Pool is completely free to decide on the appropriate level of parallelism (i.e. # of threads) for your system:
import actors.Futures._
def mkFuture(i : Int) = future {
doExpensiveStuff(i, limit)
}
val fs = (1 to range by limit).map(mkFuture)
awaitAll(timeout, fs) //wait on the work all finishing
Note that you are only going to benefit from parallelism by processing more tasks concurrently than your system has cores if the expensive work is not CPU-bound (maybe it's IO bound).
I have not used actors with that syntax, but by default I think all actors in scala use a thread pool.
See How to designate a thread pool for actors
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With