I'm just starting to learn Akka Actors in Scala. My understanding is that messages received by an Actor are queued in an Actor's mailbox, and processed one at a time. By processing messages one at a time, concurrency issues (race conditions, deadlocks) are mitigated.
But what happens if the Actor creates a future to do the work associated with a message? Since the future is async, the Actor could begin processing the next several messages while the future associated with the prior message is still running. Wouldn't this potentially create race conditions? How can one safely use futures within an Actor's receive() method to do long running tasks?
Introduction. In the Scala Standard Library, a Future is a data structure used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) or asynchronously (non-blocking). To be able to use this from Java, Akka provides a java friendly interface in akka.
A Future is a placeholder object for a value that may not yet exist. Generally, the value of the Future is supplied concurrently and can subsequently be used. Composing concurrent tasks in this way tends to result in faster, asynchronous, non-blocking parallel code.
What is an Actor in Akka? An actor is essentially nothing more than an object that receives messages and takes actions to handle them. It is decoupled from the source of the message and its only responsibility is to properly recognize the type of message it has received and take action accordingly.
The good news is that Akka actors conceptually each have their own light-weight thread, which is completely shielded from the rest of the system. This means that instead of having to synchronize access using locks you can write your actor code without worrying about concurrency at all.
The safest way to use futures within an actor is to only ever use pipeTo
on the future and send its result as a message to an actor (possibly the same actor).
import akka.pattern.pipe
object MyActor {
def doItAsynchronously(implicit ec: ExecutionContext): Future[DoItResult] = {
/* ... */
}
}
class MyActor extends Actor {
import MyActor._
import context.dispatcher
def receive = {
case DoIt =>
doItAsynchronously.pipeTo(self)
case DoItResult =>
// Got a result from doing it
}
}
This ensures that you won't mutate any state within the actor.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With