Trying to get a handle on how to think in terms of actors instead of threads. I'm a bit stumped on the following use case:
Consider a system that has a producer process that creates work (e.g. by reading data from a file), and a number of worker processes that consume the work (e.g. by parsing the data and writing it to a database). The rates at which work is produced and consumed can vary, and the system should remain robust to this. For example, if the workers can't keep up, the producer should detect this and eventually slow down or wait.
This is pretty easy to implement with threads:
val producer:Iterator[Work] = createProducer()
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE)
val workers = (0 until NUM_WORKERS) map { i =>
new Thread() {
override def run() = {
while (true) {
try {
// take next unit of work, waiting if necessary
val work = queue.take()
process(work)
}
catch {
case e:InterruptedException => return
}
}
}
}
}
// start the workers
workers.foreach(_.start())
while (producer.hasNext) {
val work = producer.next()
// add new unit of work, waiting if necessary
queue.put(work)
}
while (!queue.isEmpty) {
// wait until queue is drained
queue.wait()
}
// stop the workers
workers.foreach(_.interrupt())
There's nothing really wrong with this model, and I've successfully used it before. This example is probably overly verbose, as using an Executor or CompletionService would fit this task well. But I like the actor abstraction, and think it's easier to reason about in many cases. Is there a way to rewrite this example using actors, especially making sure that there are no buffer overflows (e.g. full mailboxes, dropped messages, etc)?
Because actors process messages "offline" (i.e. the consumption of messages is unconnected to their being received), it's difficult to see how you could have an exact analog of the "producer waits for the consumers to catch up".
The only thing I can think of is that the consumers request the work from the producer actor (which uses reply
):
case object MoreWorkPlease
class Consumer(prod : Producer) extends Actor {
def act = {
prod ! MoreWorkPlease
loop {
react {
case Work(payload) => doStuff(payload); reply(MoreWorkPlease)
}
}
}
}
class Producer extends Actor {
def act = loop {
react {
case MoreWorkPlease => reply(Work(getNextItem))
}
}
}
This is not perfect, of course, because the producer does not "read forward" and only gets work when a consumer is ready for it. The usage would be something like:
val prod = new Producer
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start())
prod.start()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With