Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pausing an actor in Akka

I have an actor in Akka that will process messages to create certain entities. Some fields on these entities are computed based on the state of other entities in the database at the moment of creation.

I would like to avoid creating a race condition where the actor processing goes faster than the database is able to persist the entities. This may lead to inconsistent data, going like:

  • Actor creates a Foo and sends it to other actors for further processing and saving
  • The actor is asked to create another Foo. Since the first one is not yet saved, the new one is created based on the old content of the DB, thereby creating a wrong Foo.

Now, this possibility is quite remote, since the creation of the Foos will be triggered manually. But it is still conceivable that a double click may cause problems under high load. And who knows if tomorrow Foo will be created automatically.

Hence, what I need is some way to tell the actor to wait, and resume its operations only after confirmation that the Foos have been saved.

Is there a way to put an actor in idle state, and tell it to resume its operations after a while?

Basically, I would like to use the mailbox as a message queue, and have control over the processing speed of the queue.

like image 453
Andrea Avatar asked Jan 28 '13 14:01

Andrea


People also ask

How to stop an Akka actor?

In Akka, you can stop Actors by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop top level Actor. The actual termination of the actor is performed asynchronously.

What is ActorRef in Akka?

java.io.Serializable. An ActorRef is the identity or address of an Actor instance. It is valid only during the Actor’s lifetime and allows messages to be sent to that Actor instance.

What does the life cycle postStop do?

2) postStop() This method is used to release resources after stopping the Actor. It may be used for deregistering this Actor. Messages sent to a stopped actor will be redirected to the deadLetters of the ActorSystem.


2 Answers

No, you cannot suspend an actor: actors always pull messages from their mailbox as quickly as possible. This leaves only the possibility that incoming requests are stashed away, to be processed later:

class A(db: ActorRef) extends Actor with Stash {
  def receive = {
    case Request =>
      doWork()
      db ! Persist
      context.setReceiveTimeout(5.seconds)
      context.become({
        case Request        => stash()
        case Persisted      => context.unbecome(); unstashAll()
        case ReceiveTimeout => throw new TimeoutException("not persisted")
      }, discardOld = false)
  }
}

Please note that message delivery is not guaranteed (or the database may be down) and therefore the timeout is recommended practice.

The underlying problem

This problem shows up mostly in those cases which are not well aligned between the actor model and the domain model: the actor is the unit of consistency, but in your use-case your consistent image requires an up-to-date external entity (the database) so that the actor does the right thing. I cannot recommend a solution without knowing more about the use-case, but try to remodel your problem taking this into account.

like image 64
Roland Kuhn Avatar answered Oct 14 '22 08:10

Roland Kuhn


It turns out that this only requires a few lines. This is the solution I came up with, which agrees with pagoda_5b suggestion:

class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
  import QueueingActor._

  def receive = {
    case message =>
      context.become({
        case Resume =>
          unstashAll()
          context.unbecome()
        case _ => stash()
      })
      nextActor ! message
  }
}

object QueueingActor {
  case class Resume()
}
like image 24
Andrea Avatar answered Oct 14 '22 09:10

Andrea