Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka, futures and critical sections

Let's say we have an Akka actor, which maintains an internal state in terms of a var.

class FooActor extends Actor {
  private var state: Int = 0

  def receive = { ... }
}

Let's say the reception handler invokes an operation that returns a future, we map it using the dispatcher as context executor and finally we set a onSuccess callback that alters the actor state.

import context.dispatcher
def receive = {
  case "Hello" => requestSomething() // asume Future[String]
    .map(_.size)
    .onSuccess { case i => state = i }
}

Is it thread-safe to alter the state of the actor from the onSuccess callback, even using the actor dispatcher as execution context?

like image 209
Alvaro Polo Avatar asked Dec 19 '22 12:12

Alvaro Polo


2 Answers

No it's not (akka 2.3.4 documentation).

What you have to do in this case is send a message to self to alter the state. If you need ordering you can use stash and become. Something like this

import akka.actor.{Stash,Actor}
import akka.pattern.pipe
case class StateUpdate(i:int)
class FooActor extends Actor with Stash{
  private var state: Int = 0
  def receive = ready
  def ready  = {
    case "Hello" => requestSomething() // asume Future[String]
      .map(StateUpdate(_.size)) pipeTo self
      become(busy)
  } 
  def busy {
     case StateUpdate(i) => 
       state=i
       unstashAll()
       become(ready)
     case State.Failure(t:Throwable) => // the future failed
     case evt =>
       stash()   
  }
}

Of course this is a simplistic implementation you will probably want to handle timeout and stuff to avoid having your actor stuck.

if you don't need ordering guarantees on your state :

case class StateUpdate(i:int)
class FooActor extends Actor with Stash{
  private var state: Int = 0
  def receive = {
    case "Hello" => requestSomething() // asume Future[String]
      .map(StateUpdate(_.size)) pipeTo self
    case StateUpdate(i) => state=i
  } 

but then the actor state may not be the length of the last string received

like image 79
Jean Avatar answered Jan 05 '23 05:01

Jean


Just to support Jean's answer here's the example from the docs :

class MyActor extends Actor {
    var state = ...
    def receive = {
        case _ =>
        //Wrongs

        // Very bad, shared mutable state,
        // will break your application in weird ways
        Future {state = NewState}
        anotherActor ? message onSuccess {
            r => state = r
        }

        // Very bad, "sender" changes for every message,
        // shared mutable state bug
        Future {expensiveCalculation(sender())}

        //Rights

        // Completely safe, "self" is OK to close over
        // and it's an ActorRef, which is thread-safe
        Future {expensiveCalculation()} onComplete {
            f => self ! f.value.get
        }

        // Completely safe, we close over a fixed value
        // and it's an ActorRef, which is thread-safe
        val currentSender = sender()
        Future {expensiveCalculation(currentSender)}
    }
}
like image 40
LMeyer Avatar answered Jan 05 '23 04:01

LMeyer