Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Incremental processing in an akka actor

Tags:

scala

akka

actor

I have actors that need to do very long-running and computationally expensive work, but the computation itself can be done incrementally. So while the complete computation itself takes hours to complete, the intermediate results are actually extremely useful, and I'd like to be able to respond to any requests of them. This is the pseudo code of what I want to do:

var intermediateResult = ...
loop {
     while (mailbox.isEmpty && computationNotFinished)
       intermediateResult = computationStep(intermediateResult)


     receive {
         case GetCurrentResult => sender ! intermediateResult
         ...other messages...
     }
 }
like image 343
Heptic Avatar asked Oct 12 '12 03:10

Heptic


1 Answers

The best way to do this is very close to what you are doing already:

case class Continue(todo: ToDo)
class Worker extends Actor {
  var state: IntermediateState = _
  def receive = {
    case Work(x) =>
      val (next, todo) = calc(state, x)
      state = next
      self ! Continue(todo)
    case Continue(todo) if todo.isEmpty => // done
    case Continue(todo) =>
      val (next, rest) = calc(state, todo)
      state = next
      self ! Continue(rest)
  }
  def calc(state: IntermediateState, todo: ToDo): (IntermediateState, ToDo)
}

EDIT: more background

When an actor sends messages to itself, Akka’s internal processing will basically run those within a while loop; the number of messages processed in one go is determined by the actor’s dispatcher’s throughput setting (defaults to 5), after this amount of processing the thread will be returned to the pool and the continuation be enqueued to the dispatcher as a new task. Hence there are two tunables in the above solution:

  • process multiple steps for a single message (if processing steps are REALLY small)
  • increase throughput setting for increased throughput and decreased fairness

The original problem seems to have hundreds of such actors running, presumably on common hardware which does not have hundreds of CPUs, so the throughput setting should probably be set such that each batch takes no longer than ca. 10ms.

Performance Assessment

Let’s play a bit with Fibonacci:

Welcome to Scala version 2.10.0-RC1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_07).
Type in expressions to have them evaluated.
Type :help for more information.

scala> def fib(x1: BigInt, x2: BigInt, steps: Int): (BigInt, BigInt) = if(steps>0) fib(x2, x1+x2, steps-1) else (x1, x2)
fib: (x1: BigInt, x2: BigInt, steps: Int)(BigInt, BigInt)

scala> def time(code: =>Unit) { val start = System.currentTimeMillis; code; println("code took " + (System.currentTimeMillis - start) + "ms") }
time: (code: => Unit)Unit

scala> time(fib(1, 1, 1000))
code took 1ms

scala> time(fib(1, 1, 1000))
code took 1ms

scala> time(fib(1, 1, 10000))
code took 5ms

scala> time(fib(1, 1, 100000))
code took 455ms

scala> time(fib(1, 1, 1000000))
code took 17172ms

Which means that in a presumably quite optimized loop, fib_100000 takes half a second. Now let’s play a bit with actors:

scala> case class Cont(steps: Int, batch: Int)
defined class Cont

scala> val me = inbox()
me: akka.actor.ActorDSL.Inbox = akka.actor.dsl.Inbox$Inbox@32c0fe13

scala> val a = actor(new Act {
  var s: (BigInt, BigInt) = _
  become {
    case Cont(x, y) if y < 0 => s = (1, 1); self ! Cont(x, -y)
    case Cont(x, y) if x > 0 => s = fib(s._1, s._2, y); self ! Cont(x - 1, y)
    case _: Cont => me.receiver ! s
   }
})
a: akka.actor.ActorRef = Actor[akka://repl/user/$c]

scala> time{a ! Cont(1000, -1); me.receive(10 seconds)}
code took 4ms

scala> time{a ! Cont(10000, -1); me.receive(10 seconds)}
code took 27ms

scala> time{a ! Cont(100000, -1); me.receive(10 seconds)}
code took 632ms

scala> time{a ! Cont(1000000, -1); me.receive(30 seconds)}
code took 17936ms

This is already interesting: given long enough time per step (with the huge BigInts behind the scenes in the last line), actors don’t much extra. Now let’s see what happens when doing smaller calculations in a more batched way:

scala> time{a ! Cont(10000, -10); me.receive(30 seconds)}
code took 462ms

This is pretty close to the result for the direct variant above.

Conclusion

Sending messages to self is NOT expensive for almost all applications, just keep the actual processing step slightly larger than a few hundred nanoseconds.

like image 87
Roland Kuhn Avatar answered Oct 18 '22 19:10

Roland Kuhn