When receiving events, Akka Actors will process one message at a time, blocking until a request has been completed before moving onto the next message.
This works well for synchronous/blocking tasks, however if I want to perform an asynchronous/non-blocking request, Akka will continue processing without waiting for the task to complete.
For example:
def doThing():Future[Unit] = /* Non blocking request here */
def receive = {
case DoThing => doThing() pipeTo sender
}
This will call doThing() and start processing the future, but will not wait for it to complete before processing the next message - it will simple execute the next messages in the queue as fast as possible.
In essence, it appears that Akka considers "returning a future" to be "finished processing" and moves onto the next message.
In order to process one message at a time, it appears I need to actively block the Actor thread to stop it doing so
def receive = {
case DoThing => sender ! blocking(Await.result(doThing()))
}
This feels like a very wrong approach - It's artificially blocking a thread in code that should otherwise be completely non-blocking.
When comparing Akka to, say, Elixir actors, we can easily avoid this problem in the first place by using a tail call to request the next message without needing to artificially block.
Is there any way in Akka to either
a) Wait for a Future
to complete before processing the next message without blocking the thread.
b) Use an explicit tail call or some other mechanism to use a pull-based workflow instead of push based?
Like it was suggested in the comments, you can use the Stash
(http://doc.akka.io/docs/akka/current/scala/actors.html#Stash) trait to store incoming messages as you wait for the Future
to resolve.
It's required to save the current sender so that you don't improperly close over the sender actor reference. You can achieve this through a simple case class like the one defined below.
class MyActor extends Actor with Stash {
import context.dispatcher
// Save the correct sender ref by using something like
// case class WrappedFuture(senderRef: ActorRef, result: Any)
def doThing(): Future[WrappedFuture] = ???
override def receive: Receive = {
case msg: DoThing =>
doThing() pipeTo self
context.become({
case WrappedFuture(senderRef, result) =>
senderRef ! result
unstashAll()
context.unbecome()
case newMsg: DoThing =>
stash()
}, discardOld = false)
}
}
With Akka Streams, you could use mapAsync
:
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Future
implicit val system = ActorSystem("ThingDoer")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
case object DoThing
def doThing(): Future[Unit] = Future {
println("doing its thing")
}
Source((1 to 10).map(_ => DoThing))
.mapAsync(parallelism = 1)(_ => doThing())
.runWith(Sink.ignore)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With