Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka - Common service actor: Identify or Extension

Lets say I have some commonly used by other actors service-layer actor. For example, an registry service that stores and retrieves domain objects:

case class DomainObject(id: UUID)

class Registry extends akka.actor.Actor {
  def receive: Receive = {
    case o: DomainObject => store(o) // save or update object

    case id: UUID => sender ! retrieve(id) // retrieve object and send it back
  }
}

I do not want to explicitly pass instance of such registry into all actors who may use it. Instead of it, I want them to be able to somehow 'locate' it.

For this I can think of two solutions:

  1. Identify message: each registry user actor knows registry actor name from some configuration and able to sent identification message to it. After AgentIdentity message is received back we are good to go:

    val registryName = ... // some name
    val registryId = ... // some id
    var registry = _
    
    def preStart() {
      context.actorSelection(registryName) ! Identify(registryId)
    }
    
    def receive: Receive = {
      case ActorIdentity(`registryId`, ref) => registry = ref
    }
    

    I do not like this way because right after user actor initialisation there is a phase when we do not know if there is a registry in system et all and thus do not know will we ever be able to operate or not.

  2. Akka Extensions: I can create an extension which would:

    a. create instance of Registry actor in given Actor System on initialization;

    b. return this actor to user who needs it via some method in Extension.

    object RegistryKey extends ExtensionKey[RegistryExtension]
    
    class RegistryExtesion(system: ExtendedActorSystem) extends RegistryKey {
      val registry = system.actorOf(Props[Registry], "registry")
    }
    

The question is: which method is better and are Akka Extesions can be used for this at all?

like image 470
Seigert Avatar asked Jun 09 '13 12:06

Seigert


People also ask

Are Akka actors single threaded?

Behind the scenes Akka will run sets of actors on sets of real threads, where typically many actors share one thread, and subsequent invocations of one actor may end up being processed on different threads.

What is ActorSystem in Akka?

Companion object ActorSystem An actor system is a hierarchical group of actors which share common configuration, e.g. dispatchers, deployments, remote capabilities and addresses. It is also the entry point for creating or looking up actors. There are several possibilities for creating actors (see akka.

Can an Akka actor stop other actors?

In Akka, you can stop Actors by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop top level Actor. The actual termination of the actor is performed asynchronously.


2 Answers

I think the extension idea is a good one as long as your registry actor is always going to be in the same ActorSystem.

Alternatively, using actorSelection (adapted from Remote Lookup):

class RegistryClient extends Actor {
  val path = "/path/to/registry/actor"
  context.setReceiveTimeout(3.seconds)

  def sendIdentifyRequest(): Unit =
    context.actorSelection(path) ! Identify(path)

  def receive = {
    case ActorIdentity(`path`, Some(ref)) ⇒
      context.setReceiveTimeout(Duration.Undefined)
      context.become(active(ref))
    case ActorIdentity(`path`, None) ⇒
      throw new RuntimeException("Registry not found")
    case ReceiveTimeout ⇒ sendIdentifyRequest() 
  }

  def active(registry: ActorRef): Actor.Receive = {
    // use the registry
  }
}

This will work for remote or local actors.

Let's look at the extension solution. Actors are created asynchronously. Therefore your extension constructor won't fail when calling actorOf if the actor fails to initialize.

If you want to know for sure that the actor failed to initialize then one way to know is to ask the actor something that it will respond to and Await a response. The Await will throw a TimeoutException if the actor fails to respond.

class RegistryExtension(system: ExtendedActorSystem) extends Extension {
  val registry = system.actorOf(Props[Registry], "registry")
  implicit val timeout: Timeout = Timeout(500.millis)
  val f = registry ? "ping"    // Registry should case "ping" => "pong" 
  Await.result(f, 500.millis)  // Will throw a TimeoutException if registry fails 
                               // to respond
}

The TimeoutException will get thrown when you call RegistryExtension(system).registry the first time.

like image 80
sourcedelica Avatar answered Sep 30 '22 17:09

sourcedelica


How about the Cake Pattern or a Dependency Injection library such as subcut.

Derek Wyatt mentions DI in his book 'Akka Concurrency' instead of using too much actorFor to look up actors: http://www.artima.com/forums/flat.jsp?forum=289&thread=347118

like image 26
theon Avatar answered Sep 30 '22 17:09

theon