I have an actor in Akka that will process messages to create certain entities. Some fields on these entities are computed based on the state of other entities in the database at the moment of creation.
I would like to avoid creating a race condition where the actor processing goes faster than the database is able to persist the entities. This may lead to inconsistent data, going like:
Foo
and sends it to other actors for further processing and savingFoo
. Since the first one is not yet saved, the new one is created based on the old content of the DB, thereby creating a wrong Foo
.Now, this possibility is quite remote, since the creation of the Foo
s will be triggered manually. But it is still conceivable that a double click may cause problems under high load. And who knows if tomorrow Foo
will be created automatically.
Hence, what I need is some way to tell the actor to wait, and resume its operations only after confirmation that the Foo
s have been saved.
Is there a way to put an actor in idle state, and tell it to resume its operations after a while?
Basically, I would like to use the mailbox as a message queue, and have control over the processing speed of the queue.
In Akka, you can stop Actors by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop top level Actor. The actual termination of the actor is performed asynchronously.
java.io.Serializable. An ActorRef is the identity or address of an Actor instance. It is valid only during the Actor’s lifetime and allows messages to be sent to that Actor instance.
2) postStop() This method is used to release resources after stopping the Actor. It may be used for deregistering this Actor. Messages sent to a stopped actor will be redirected to the deadLetters of the ActorSystem.
No, you cannot suspend an actor: actors always pull messages from their mailbox as quickly as possible. This leaves only the possibility that incoming requests are stashed away, to be processed later:
class A(db: ActorRef) extends Actor with Stash {
def receive = {
case Request =>
doWork()
db ! Persist
context.setReceiveTimeout(5.seconds)
context.become({
case Request => stash()
case Persisted => context.unbecome(); unstashAll()
case ReceiveTimeout => throw new TimeoutException("not persisted")
}, discardOld = false)
}
}
Please note that message delivery is not guaranteed (or the database may be down) and therefore the timeout is recommended practice.
This problem shows up mostly in those cases which are not well aligned between the actor model and the domain model: the actor is the unit of consistency, but in your use-case your consistent image requires an up-to-date external entity (the database) so that the actor does the right thing. I cannot recommend a solution without knowing more about the use-case, but try to remodel your problem taking this into account.
It turns out that this only requires a few lines. This is the solution I came up with, which agrees with pagoda_5b suggestion:
class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
import QueueingActor._
def receive = {
case message =>
context.become({
case Resume =>
unstashAll()
context.unbecome()
case _ => stash()
})
nextActor ! message
}
}
object QueueingActor {
case class Resume()
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With