Update: Source code for this question can be found here. Courtesy of danluu.
Is there a standard way of replacing actorFor
now that it has been deprecated?
Everything I found involves actorSelection
- but getting 3 dependencies with setup with a selection is a real pita. Most of the discussion online just says not to use actorFor
, but all the books on akka still make use of it (since they target pre-2.2 akka).
For example, let's say I have two actors Pilot
and Copilot
, both children of the Plane
actor. Here is how they currently get a reference to each other (Akka Concurrency, Derek Wyatt, 2013):
// Plane.scala
...
override def preStart(){
// Get our children going.
//... create pilot/copilot (in a separate function)
startPeople()
// Tell them system is ready to go
context.actorFor("Pilots/" + pilotName) ! ReadyToGo
context.actorFor("Pilots/" + copilotName) ! ReadyToGo
// Pilot.scala
def receive = {
case ReadyToGo =>
val copilotName = context.system.settings.config.getString("zzz.akka.avionics.flightcrew.copilotName")
copilot = context.actorFor("../" + copilotName)
// Copilot.scala
def receive = {
case ReadyToGo =>
pilot = context.actorFor("../" + pilotName)
// watch out for pilot dying - take over control when he does
context.watch(pilot)
case Controls(ctr) =>
controls = ctr
case Terminated(_) =>
// Pilot died!
plane ! GiveMeControl
How would I do this without actorFor? Oh, and DI contructor injection would not work here - since both pilot and copilot need to know about each other.
Thanks.
P.S.: Here is the content of startPeople()
method, not sure if it matters:
def startPeople() {
val plane = self
val controls: ActorRef = actorForControls("ControlSurfaces")
val autopilot: ActorRef = actorForControls("Autopilot")
val altimeter: ActorRef = actorForControls("Altimeter")
val people = context.actorOf(Props(new IsolatedStopSupervisor with OneForOneStrategyFactory
{
override def childStarter() {
// These children get implicitly added to the hierarchy
context.actorOf(Props(newCopilot(plane, autopilot, altimeter)), copilotName)
context.actorOf(Props(newPilot(plane, autopilot, controls, altimeter)), pilotName)
}
}), "Pilots")
// Use the default strategy here, which restarts indefinitely
context.actorOf(Props(newLeadFlightAttendant), attendantName)
Await.result(people ? WaitForStart, 1.second)
}
Thanks to Derek Wyatt (author) for the great answer.
As an example, here is how I used Futures with map
notation, in oneCopilot
actor to acquire a dependency on Pilot
actor:
package zzz.akka.avionics
import akka.actor._
import zzz.akka.avionics.Pilots.ReadyToGo
import zzz.akka.avionics.Plane.GiveMeControl
import akka.actor.Terminated
import zzz.akka.avionics.Plane.Controls
import scala.concurrent.Future
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.pipe
/** Copilot is a fail-over for the pilot.
* Created by Andriy Drozdyuk on 05-Apr-14.
*/
class Copilot(plane: ActorRef, autopilot: ActorRef, altimeter: ActorRef) extends Actor with Stash {
// Implicit execution context for futures
implicit val ec = context.dispatcher
// Implicit timeout for getting dependencies
implicit val timeout = Timeout(1.second)
val conf = context.system.settings.config
var pilotName = conf.getString("zzz.akka.avionics.flightCrew.pilotName")
var controls: ActorRef = context.system.deadLetters
// Helps us get pilot dependency
trait PilotAcquisition
case class PilotAcquired(pilot: ActorRef) extends PilotAcquisition
case class PilotNotAcquired(t: Throwable) extends PilotAcquisition
// Acquire the pilot
// Send either PilotAcquired or PilotNotAcquired message to self
acquirePilot pipeTo self
def acquirePilot: Future[PilotAcquisition] = {
context.actorSelection("../" + pilotName).resolveOne() map {
pilot => PilotAcquired(pilot)
} recover {
case t:Throwable => PilotNotAcquired(t)
}
}
def receive: Receive = waitingForDependencies
def waitingForDependencies: Receive = {
case PilotAcquired(pilot) =>
// Get all the messages we stashed and receive them
unstashAll()
// pass all our acquired dependencies in
context.become(operational(pilot))
case PilotNotAcquired(t) => throw new IllegalStateException(s"Failed to instantiate: $t")
// Any other message save for later
case _ => stash()
}
// All our dependencies have been acquired
def operational(pilot: ActorRef) : Receive = {
case ReadyToGo =>
// Start watch on the pilot in case he kicks it
context.watch(pilot)
case Controls(ctr) =>
controls = ctr
case Terminated(_) =>
// Pilot died!
plane ! GiveMeControl
}
}
The ActorSystem is a root actor in actors structure. An ActorSystem is a hierarchical group of actors which share common configuration, e.g. dispatchers, deployments, remote capabilities and addresses. It is also the entry point for creating or looking up actors.
An ActorContext in addition provides access to the Actor’s own identity (“ getSelf ”), the ActorSystem it is part of, methods for querying the list of child Actors it created, access to Terminated and timed message scheduling.
In Akka, you can stop Actors by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop top level Actor. The actual termination of the actor is performed asynchronously.
1) Akka Actor tell() Method It works on "fire-forget" approach. You can also use ! (bang) exclamation mark to send message. This is the preferred way of sending messages.
ActorSelection
is really the way to replace actorFor
. However, as I've already stated in the book, using Actor paths or selections can make your app quite brittle.
context.actorSelection("../someactor") ! ReadyToGo
context.actorSelection(self / "someChild" / "someGrandchild") ! ReadyToGo
Change your actor hierarchy, and your app starts failing. However, at least with ActorSelection
you get some timeout errors instead of just getting things sent to the dead letter office.
Now, if you want to start grabbing references, that's a different story and I suggest doing it with Future
s.
import akka.actor._
import akka.pattern.pipe
class MyActor extends Actor with Stash {
val actors = for {
actorA <- ActorSelection(someRootActor / List("child1", "grandchild", "greatGrandchild")).resolveOne()
actorB <- ActorSelection(someRootActor / List("child2")).resolveOne()
actorC <- ActorSelection(someOtherRootActor / List("childC")).resolveOne()
} yield ActorsLocated(actorA, actorB, actorC)
actors recover {
case t: Throwable =>
ActorLocatingFailed(t)
} pipeTo self
val uninitialized: Receive = {
case ActorsLocated(a, b, c) =>
unstashAll()
context.become(initialized(a, b, c))
case ActorLocatingFailed(reason) =>
throw new IllegalStateException(s"Failed to initialize: $reason")
case _ =>
stash()
}
def initalized(a: ActorRef, b: ActorRef, c: ActorRef): Receive = {
// do your stuff here
}
def receive = uninitialized
}
Now you have a fully asynchronous startup for your Actor that grabs all of its dependencies properly. On restart, you're good to go.
I think you have two possibilities:
"Injecting" the actors via message (e.g. ReadyToGo):
coPilotActor ! ReadyToGo(pilotActor)
pilotActor ! ReadyToGo(copilotActor)
Using ActorSelection:
context.actorSelection(actorPath) ! Identify(<correlation_id>)
// ...
// response:
case ActorIdentity(<correlation_id>, None) => // actor ref not found
// or
case ActorIdentity(<correlation_id>, Some(ref)) => // actor ref found
Because you are creating the two actors in plane, maybe you find a solution where you don't have to use actor selection in this case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With