I have written the following code in order to try to ascertain the behavior of an actor with respect to incoming messages while invoking some async work within a Future (actually the async work is meant to model Slick's async database API):
val actor = system.actorOf(Props[WTFAsyncActor])
for (i <- 1 until 100) {
actor ! 'wtf + i.toString
Thread.sleep(200l) // yes I know actors should not invoke this method
}
Thread.sleep(10000l) // same as above
The Actor's code is as follows:
class WTFAsyncActor extends Actor with ActorLogging{
import scala.concurrent.ExecutionContext.Implicits.global
override def receive: Receive = {
case i@_ =>
log.info(s"[$i] external block pre future")
Thread.`yield`() // just trying to relinquish control to induce context switching
Future {
log.info(s"[$i] internal block pre sleep")
Thread.sleep(1000l) // yeah bad - trying to emulate something meaningful like slick
log.info(s"[$i] internal block post sleep")
}
log.info(s"[$i] external block post future")
}
}
I get the following logs (an excerpt from a sample run)
10:17:58.408 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf1] external block pre future
10:17:58.420 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf1] external block post future
10:17:58.421 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf1] internal block pre sleep
10:17:58.599 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf2] external block pre future
10:17:58.600 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf2] external block post future
10:17:58.600 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf2] internal block pre sleep
10:17:58.800 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf3] external block pre future
10:17:58.801 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf3] external block post future
10:17:58.801 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf3] internal block pre sleep
10:17:59.001 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf4] external block pre future
10:17:59.002 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf4] external block post future
10:17:59.002 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf4] internal block pre sleep
10:17:59.202 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf5] external block pre future
10:17:59.206 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf5] external block post future
10:17:59.402 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf6] external block pre future
10:17:59.403 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf6] external block post future
10:17:59.421 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf1] internal block post sleep
10:17:59.421 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf6] internal block pre sleep
10:17:59.607 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf2] internal block post sleep
10:17:59.607 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf5] internal block pre sleep
10:17:59.608 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf7] external block pre future
I think it is safe to say that new messages are treated as they come in as long as some thread is available regardless of the Future block not yet finishing its execution. Am I right?
What I had set to uncover is whether the receive block must wait for the Future block to finish before handling new messages. It seems as if it does not. (wtf2 comes in on dispatcher thread 3 before wtf1 completes calculation on thread 3, for example)
Are there any caveats to this behavior?
Please excuse what might seem like a silly question. I did not look under the hood of the akka code base (I am way too new to scala and akka for that, at the moment)
It is the case that subsequent messages can be processed by receive
before any Future completes. There are not really any caveats to this behaviour, and there are things to watch out for. In particular, do not close over any of your actor's mutable state within a Future. The Future may be executed concurrently with messages being processed by the actor, breaking the guarantee of single threaded access to the actors state, and leading to race conditions which may leave your actor's state in an invalid position.
Where you want to asynchronously launch some work from within an Actor, a common pattern is to launch a child actor to do the work. The child actor can send messages back to its parent with any results, and you know for sure that the child actor cannot interfere with the state of the parent actor.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With