Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you replace actorFor?

Tags:

scala

akka

Update: Source code for this question can be found here. Courtesy of danluu.

Is there a standard way of replacing actorFor now that it has been deprecated? Everything I found involves actorSelection - but getting 3 dependencies with setup with a selection is a real pita. Most of the discussion online just says not to use actorFor, but all the books on akka still make use of it (since they target pre-2.2 akka).

For example, let's say I have two actors Pilot and Copilot, both children of the Plane actor. Here is how they currently get a reference to each other (Akka Concurrency, Derek Wyatt, 2013):

// Plane.scala
...
override def preStart(){
    // Get our children going.

    //... create pilot/copilot (in a separate function)
    startPeople()

    // Tell them system is ready to go
    context.actorFor("Pilots/" + pilotName) ! ReadyToGo
    context.actorFor("Pilots/" + copilotName) ! ReadyToGo

// Pilot.scala
def receive = {
    case ReadyToGo =>
      val copilotName = context.system.settings.config.getString("zzz.akka.avionics.flightcrew.copilotName")
      copilot = context.actorFor("../" + copilotName)

// Copilot.scala
def receive = {
    case ReadyToGo =>
      pilot = context.actorFor("../" + pilotName)
      // watch out for pilot dying - take over control when he does
      context.watch(pilot)

    case Controls(ctr) =>
      controls = ctr

    case Terminated(_) =>
      // Pilot died!
      plane ! GiveMeControl

How would I do this without actorFor? Oh, and DI contructor injection would not work here - since both pilot and copilot need to know about each other.

Thanks.

P.S.: Here is the content of startPeople() method, not sure if it matters:

 def startPeople() {
    val plane = self

    val controls: ActorRef = actorForControls("ControlSurfaces")
    val autopilot: ActorRef = actorForControls("Autopilot")
    val altimeter: ActorRef = actorForControls("Altimeter")

    val people = context.actorOf(Props(new IsolatedStopSupervisor with OneForOneStrategyFactory
    {
      override def childStarter() {
        // These children get implicitly added to the hierarchy
        context.actorOf(Props(newCopilot(plane, autopilot, altimeter)), copilotName)
        context.actorOf(Props(newPilot(plane, autopilot, controls, altimeter)),  pilotName)
      }
    }), "Pilots")

    // Use the default strategy here, which restarts indefinitely
    context.actorOf(Props(newLeadFlightAttendant), attendantName)
    Await.result(people ? WaitForStart, 1.second)
  }

Update: Solved

Thanks to Derek Wyatt (author) for the great answer. As an example, here is how I used Futures with map notation, in oneCopilot actor to acquire a dependency on Pilot actor:

package zzz.akka.avionics

import akka.actor._
import zzz.akka.avionics.Pilots.ReadyToGo
import zzz.akka.avionics.Plane.GiveMeControl
import akka.actor.Terminated
import zzz.akka.avionics.Plane.Controls
import scala.concurrent.Future
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.pipe

/** Copilot is a fail-over for the pilot.
 * Created by Andriy Drozdyuk on 05-Apr-14.
 */
class Copilot(plane: ActorRef, autopilot: ActorRef, altimeter: ActorRef) extends Actor with Stash {
  // Implicit execution context for futures
  implicit val ec = context.dispatcher
  // Implicit timeout for getting dependencies
  implicit val timeout = Timeout(1.second)

  val conf = context.system.settings.config
  var pilotName = conf.getString("zzz.akka.avionics.flightCrew.pilotName")

  var controls: ActorRef = context.system.deadLetters

  // Helps us get pilot dependency
  trait PilotAcquisition
  case class PilotAcquired(pilot: ActorRef) extends PilotAcquisition
  case class PilotNotAcquired(t: Throwable) extends PilotAcquisition

  // Acquire the pilot
  // Send either PilotAcquired or PilotNotAcquired message to self
  acquirePilot pipeTo self

  def acquirePilot: Future[PilotAcquisition] = {
    context.actorSelection("../" + pilotName).resolveOne() map {
      pilot => PilotAcquired(pilot)
    } recover {
      case t:Throwable => PilotNotAcquired(t)
    }
  }
  def receive: Receive = waitingForDependencies

  def waitingForDependencies: Receive = {
    case PilotAcquired(pilot) =>
      // Get all the messages we stashed and receive them
      unstashAll()
      // pass all our acquired dependencies in
      context.become(operational(pilot))

    case PilotNotAcquired(t) => throw new IllegalStateException(s"Failed to instantiate: $t")

    // Any other message save for later
    case _ => stash()
  }

  // All our dependencies have been acquired
  def operational(pilot: ActorRef) : Receive = {
    case ReadyToGo =>
      // Start watch on the pilot in case he kicks it
      context.watch(pilot)

    case Controls(ctr) =>
      controls = ctr

    case Terminated(_) =>
      // Pilot died!
      plane ! GiveMeControl

  }


}
like image 532
Andriy Drozdyuk Avatar asked Apr 09 '14 02:04

Andriy Drozdyuk


People also ask

What is ActorSystem?

The ActorSystem is a root actor in actors structure. An ActorSystem is a hierarchical group of actors which share common configuration, e.g. dispatchers, deployments, remote capabilities and addresses. It is also the entry point for creating or looking up actors.

What is ActorContext?

An ActorContext in addition provides access to the Actor’s own identity (“ getSelf ”), the ActorSystem it is part of, methods for querying the list of child Actors it created, access to Terminated and timed message scheduling.

How do you stop Akka actor?

In Akka, you can stop Actors by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop top level Actor. The actual termination of the actor is performed asynchronously.

How can I send a message to an actor in Akka?

1) Akka Actor tell() Method It works on "fire-forget" approach. You can also use ! (bang) exclamation mark to send message. This is the preferred way of sending messages.


2 Answers

ActorSelection is really the way to replace actorFor. However, as I've already stated in the book, using Actor paths or selections can make your app quite brittle.

context.actorSelection("../someactor") ! ReadyToGo
context.actorSelection(self / "someChild" / "someGrandchild") ! ReadyToGo

Change your actor hierarchy, and your app starts failing. However, at least with ActorSelection you get some timeout errors instead of just getting things sent to the dead letter office.

Now, if you want to start grabbing references, that's a different story and I suggest doing it with Futures.

import akka.actor._
import akka.pattern.pipe

class MyActor extends Actor with Stash {
  val actors = for {
    actorA <- ActorSelection(someRootActor / List("child1", "grandchild", "greatGrandchild")).resolveOne()
    actorB <- ActorSelection(someRootActor / List("child2")).resolveOne()
    actorC <- ActorSelection(someOtherRootActor / List("childC")).resolveOne()
  } yield ActorsLocated(actorA, actorB, actorC)
  actors recover {
    case t: Throwable =>
      ActorLocatingFailed(t)
  } pipeTo self

  val uninitialized: Receive = {
    case ActorsLocated(a, b, c) =>
      unstashAll()
      context.become(initialized(a, b, c))
    case ActorLocatingFailed(reason) =>
      throw new IllegalStateException(s"Failed to initialize: $reason")
    case _ =>
      stash()
  }

  def initalized(a: ActorRef, b: ActorRef, c: ActorRef): Receive = {
    // do your stuff here
  }

  def receive = uninitialized
}

Now you have a fully asynchronous startup for your Actor that grabs all of its dependencies properly. On restart, you're good to go.

like image 188
Derek Wyatt Avatar answered Nov 03 '22 01:11

Derek Wyatt


I think you have two possibilities:

"Injecting" the actors via message (e.g. ReadyToGo):

coPilotActor ! ReadyToGo(pilotActor)
pilotActor ! ReadyToGo(copilotActor)

Using ActorSelection:

 context.actorSelection(actorPath) ! Identify(<correlation_id>)
 // ...
 // response:
 case ActorIdentity(<correlation_id>, None) => // actor ref not found
 // or
 case ActorIdentity(<correlation_id>, Some(ref)) => // actor ref found

Because you are creating the two actors in plane, maybe you find a solution where you don't have to use actor selection in this case.

like image 26
Christian Avatar answered Nov 03 '22 02:11

Christian