Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

akka's Actor's receive method interaction with a Future (block) - can new messages come in before Future completes?

I have written the following code in order to try to ascertain the behavior of an actor with respect to incoming messages while invoking some async work within a Future (actually the async work is meant to model Slick's async database API):

  val actor = system.actorOf(Props[WTFAsyncActor])

  for (i <- 1 until 100) {
    actor ! 'wtf + i.toString
    Thread.sleep(200l) // yes I know actors should not invoke this method
  }
  Thread.sleep(10000l) // same as above

The Actor's code is as follows:

class WTFAsyncActor extends Actor with ActorLogging{
  import scala.concurrent.ExecutionContext.Implicits.global
  override def receive: Receive = {

    case i@_ =>

      log.info(s"[$i] external block pre future")
      Thread.`yield`() // just trying to relinquish control to induce context switching

      Future {
        log.info(s"[$i] internal block pre sleep")
        Thread.sleep(1000l) // yeah bad - trying to emulate something meaningful like slick
        log.info(s"[$i] internal block post sleep")
      }

      log.info(s"[$i] external block post future")
  }
}  

I get the following logs (an excerpt from a sample run)

10:17:58.408 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf1] external block pre future
10:17:58.420 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf1] external block post future
10:17:58.421 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf1] internal block pre sleep
10:17:58.599 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf2] external block pre future
10:17:58.600 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf2] external block post future
10:17:58.600 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf2] internal block pre sleep
10:17:58.800 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf3] external block pre future
10:17:58.801 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf3] external block post future
10:17:58.801 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf3] internal block pre sleep
10:17:59.001 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf4] external block pre future
10:17:59.002 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf4] external block post future
10:17:59.002 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf4] internal block pre sleep
10:17:59.202 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf5] external block pre future
10:17:59.206 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf5] external block post future
10:17:59.402 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf6] external block pre future
10:17:59.403 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf6] external block post future
10:17:59.421 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf1] internal block post sleep
10:17:59.421 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf6] internal block pre sleep
10:17:59.607 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf2] internal block post sleep
10:17:59.607 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf5] internal block pre sleep
10:17:59.608 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf7] external block pre future

I think it is safe to say that new messages are treated as they come in as long as some thread is available regardless of the Future block not yet finishing its execution. Am I right?

What I had set to uncover is whether the receive block must wait for the Future block to finish before handling new messages. It seems as if it does not. (wtf2 comes in on dispatcher thread 3 before wtf1 completes calculation on thread 3, for example)

Are there any caveats to this behavior?

Please excuse what might seem like a silly question. I did not look under the hood of the akka code base (I am way too new to scala and akka for that, at the moment)

like image 931
Yaneeve Avatar asked Jul 26 '15 07:07

Yaneeve


1 Answers

It is the case that subsequent messages can be processed by receive before any Future completes. There are not really any caveats to this behaviour, and there are things to watch out for. In particular, do not close over any of your actor's mutable state within a Future. The Future may be executed concurrently with messages being processed by the actor, breaking the guarantee of single threaded access to the actors state, and leading to race conditions which may leave your actor's state in an invalid position.

Where you want to asynchronously launch some work from within an Actor, a common pattern is to launch a child actor to do the work. The child actor can send messages back to its parent with any results, and you know for sure that the child actor cannot interfere with the state of the parent actor.

like image 50
mattinbits Avatar answered Sep 20 '22 19:09

mattinbits