Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to prevent actor starvation in the presence of other long-running actors?

This is using Scala 2.8 Actors. I have a long running job which can be parallelized. It consists of about 650,000 units of work. I divide it into 2600 different separate subtasks, and for each of these I create a new actor:

actor {
  val range = (0L to total by limit)
  val latch = new CountDownLatch(range.length)
  range.foreach { offset =>
    actor {
      doExpensiveStuff(offset,limit)
      latch.countDown
    }
  }
  latch.await
}

This works fairly well, but overall takes 2+h to complete. The issue is that in the meanwhile, any other actors I create to do normal tasks seem to be starved out by the initial 2600 actors which are also patiently awaiting their time to be run on a thread but have been waiting longer than any new actors that come along.

How might I go about avoiding this starvation?

Initial thoughts:

  • Instead of 2600 actors, use one actor that sequentially plows through the large pile of work. I'm not fond of this because I'd like this job to finish sooner by splitting it up.
  • Instead of 2600 actors, use two actors, each processing a different half of the total work set. This might work better, but what if my machine has 8 cores? I'd likely want to utilize more than that.

UPDATE

Some folks have questioned the use of Actors at all, especially since the message passing capability was not being used within the workers. I had assumed that the Actor was a very lightweight abstraction around a ThreadPool at or near the same performance level of simply coding the ThreadPool-based execution manually. So I wrote a little benchmark:

import testing._
import java.util.concurrent._
import actors.Futures._

val count = 100000
val poolSize = 4
val numRuns = 100

val ActorTest = new Benchmark {
  def run = {
    (1 to count).map(i => future {
      i * i
    }).foreach(_())
  }
}

val ThreadPoolTest = new Benchmark {
  def run = {
    val queue = new LinkedBlockingQueue[Runnable]
    val pool = new ThreadPoolExecutor(
          poolSize, poolSize, 1, TimeUnit.SECONDS, queue)
    val latch = new CountDownLatch(count)
    (1 to count).map(i => pool.execute(new Runnable {
      override def run = {
        i * i
        latch.countDown
      }
    }))
    latch.await
  }
}

List(ActorTest,ThreadPoolTest).map { b =>
  b.runBenchmark(numRuns).sum.toDouble / numRuns
}

// List[Double] = List(545.45, 44.35)

I used the Future abstraction in the ActorTest to avoid passing a message back to another actor to signal work was done. I was surprised to find that my Actor code was over 10 times slower. Note that I also created my ThreadPoolExecutor with an initial pool size with which the default Actor pool is created.

Looking back, it seems like I've possibly overused the Actor abstraction. I'm going to look into using separate ThreadPools for these distinct, expensive, long-running tasks.

like image 633
Collin Avatar asked Nov 08 '10 20:11

Collin


3 Answers

No matter how many actors you have, if you're not configuring your scheduling explicitly, all of them are backed with a single fork/join scheduler (running against a thread pool with capacity 4, if I'm not mistaken). That's where starvation comes from.

  1. You should try different schedulers for your pool of actors, to find the one that shows the best performance (try ResizableThreadPoolScheduler, if you want to maximize parallelism using as much threads as possible)
  2. You need to have a separate scheduler for the huge pool of actors (other actors in your system shouln't use it)
  3. As it was suggested by @DaGGeRRz you may try Akka framework that offers configurable dispatchers (e.g., work stealing load balancing dispatcher moves events from mailboxes of the busy actors to the idle actors)

From the comments to default Actor implementation:

The run-time system can be configured to use a larger thread pool size (for example, by setting the actors.corePoolSize JVM property). The scheduler method of the Actor trait can be overridden to return a ResizableThreadPoolScheduler, which resizes its thread pool to avoid starvation caused by actors that invoke arbitrary blocking methods. The actors.enableForkJoin JVM property can be set to false, in which case a ResizableThreadPoolScheduler is used by default to execute actors.

In addition: an interesting thread on schedulers at scala-lang.

like image 163
Vasil Remeniuk Avatar answered Nov 08 '22 12:11

Vasil Remeniuk


It seems from your example that you don't actually need to use actors at all, as you are not passing messages to your work units, or replying, or even looping.

Why not just create a load of Futures and then wait on them finishing? That way, the underlying Fork Join Pool is completely free to decide on the appropriate level of parallelism (i.e. # of threads) for your system:

import actors.Futures._
def mkFuture(i : Int) = future {
  doExpensiveStuff(i, limit)
}
val fs = (1 to range by limit).map(mkFuture)
awaitAll(timeout, fs) //wait on the work all finishing

Note that you are only going to benefit from parallelism by processing more tasks concurrently than your system has cores if the expensive work is not CPU-bound (maybe it's IO bound).

like image 25
oxbow_lakes Avatar answered Nov 08 '22 14:11

oxbow_lakes


I have not used actors with that syntax, but by default I think all actors in scala use a thread pool.

See How to designate a thread pool for actors

like image 3
Ben Jackson Avatar answered Nov 08 '22 13:11

Ben Jackson