Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transition from thread model to actors

Trying to get a handle on how to think in terms of actors instead of threads. I'm a bit stumped on the following use case:

Consider a system that has a producer process that creates work (e.g. by reading data from a file), and a number of worker processes that consume the work (e.g. by parsing the data and writing it to a database). The rates at which work is produced and consumed can vary, and the system should remain robust to this. For example, if the workers can't keep up, the producer should detect this and eventually slow down or wait.

This is pretty easy to implement with threads:

val producer:Iterator[Work] = createProducer()
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE)
val workers = (0 until NUM_WORKERS) map { i =>
  new Thread() { 
    override def run() = {
      while (true) {
        try {
          // take next unit of work, waiting if necessary
          val work = queue.take()
          process(work)
        }
        catch {
          case e:InterruptedException => return
        }
      }
    }
  }
}

// start the workers
workers.foreach(_.start())

while (producer.hasNext) {
  val work = producer.next()
  // add new unit of work, waiting if necessary
  queue.put(work)
}

while (!queue.isEmpty) {
  // wait until queue is drained
  queue.wait()
}

// stop the workers
workers.foreach(_.interrupt())

There's nothing really wrong with this model, and I've successfully used it before. This example is probably overly verbose, as using an Executor or CompletionService would fit this task well. But I like the actor abstraction, and think it's easier to reason about in many cases. Is there a way to rewrite this example using actors, especially making sure that there are no buffer overflows (e.g. full mailboxes, dropped messages, etc)?

like image 862
toluju Avatar asked Nov 11 '10 21:11

toluju


1 Answers

Because actors process messages "offline" (i.e. the consumption of messages is unconnected to their being received), it's difficult to see how you could have an exact analog of the "producer waits for the consumers to catch up".

The only thing I can think of is that the consumers request the work from the producer actor (which uses reply):

case object MoreWorkPlease
class Consumer(prod : Producer) extends Actor {
  def act = {
    prod ! MoreWorkPlease
    loop {
      react {
        case Work(payload) => doStuff(payload); reply(MoreWorkPlease)
      }
    }
  }
}

class Producer extends Actor {
  def act = loop {
    react {
      case MoreWorkPlease => reply(Work(getNextItem))
    }
  }
}

This is not perfect, of course, because the producer does not "read forward" and only gets work when a consumer is ready for it. The usage would be something like:

val prod = new Producer
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start())
prod.start()
like image 197
oxbow_lakes Avatar answered Sep 29 '22 07:09

oxbow_lakes