Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Actor "ask" and "Await" with TimeoutException

Tags:

scala

akka

actor

I'm modeling a simple P2P with Scala and Akka:

class Node() extends Peer with Actor {

  var peers: List[ActorRef] = List()

  def receive = {
    case _register(peer: ActorRef, p: Option[Int]) => {
      println("registering [" + peer + "] for [" + this + "]")
      peers = peer :: peers
    }
  }

}

sealed case class _register(val peer: ActorRef, var p: Option[Int] = None)

and then a simple network:

class Network() extends Actor {

  def this(name: String) = {

    this()

    val system = ActorSystem(name)

    val s1 = system.actorOf(Props(new Node()), name = "s1")
    val s2 = system.actorOf(Props(new Node()), name = "s2")

    val c1 = system.actorOf(Props(new Node()), name = "c1")
    val c2 = system.actorOf(Props(new Node()), name = "c2")
    val c3 = system.actorOf(Props(new Node()), name = "c3")
    val c4 = system.actorOf(Props(new Node()), name = "c4")

    implicit val timeout = Timeout(5 second)

    s1 ? _register(c1)
    s1 ? _register(c2)
    s1 ? _register(c3)
    val lastRegistered = s2 ? _register(c4)
    Await.ready(lastRegistered, timeout.duration)

    println("initialized nodes")
  }
}

The output that I'm getting is always like:

registering [Actor[akka://p2p/user/c1]] for [nl.cwi.crisp.examples.p2p.scala.Node@14b5f4a]
registering [Actor[akka://p2p/user/c2]] for [nl.cwi.crisp.examples.p2p.scala.Node@14b5f4a]
registering [Actor[akka://p2p/user/c3]] for [nl.cwi.crisp.examples.p2p.scala.Node@14b5f4a]
registering [Actor[akka://p2p/user/c4]] for [nl.cwi.crisp.examples.p2p.scala.Node@13c0b53]
[ERROR] [04/10/2012 22:07:04.34] [main-akka.actor.default-dispatcher-1] [akka://main/user/p2p] error while creating actor
java.util.concurrent.TimeoutException: Futures timed out after [5000] milliseconds
    at akka.dispatch.DefaultPromise.ready(Future.scala:834)
    at akka.dispatch.DefaultPromise.ready(Future.scala:811)
    at akka.dispatch.Await$.ready(Future.scala:64)
    at nl.cwi.crisp.examples.p2p.scala.Network.<init>(Node.scala:136)
    at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164)
    at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164)
    at akka.actor.ActorCell.newActor(ActorCell.scala:488)
    at akka.actor.ActorCell.create$1(ActorCell.scala:506)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:591)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:191)
    at akka.dispatch.Mailbox.run(Mailbox.scala:160)
    at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
    at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
    at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:997)
    at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1495)
    at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

I have followed the documentation of Futures on Akka reference documentation. Exchanging Await.ready with Await.result does not have any effect. The log shows that the last registration has been successful.

How should I fix this?

like image 906
nobeh Avatar asked Apr 10 '12 20:04

nobeh


2 Answers

You are waiting for a message to be returned from the Node actor, but the Node actor does not send a message back to the sender actorRef, so the Future[Any] created by s1 ? _register will never receive a response, so the Future will never be complete. You could add sender ! something from inside the Node receive method to send a response, I'm not sure what something makes sense in this case.

like image 170
stew Avatar answered Nov 15 '22 09:11

stew


stew's got it right, but you've got some worrisome code in your network actor:

val system = ActorSystem(name)

val s1 = system.actorOf(Props(new Node()), name = "s1")
val s2 = system.actorOf(Props(new Node()), name = "s2")

val c1 = system.actorOf(Props(new Node()), name = "c1")
val c2 = system.actorOf(Props(new Node()), name = "c2")
val c3 = system.actorOf(Props(new Node()), name = "c3")
val c4 = system.actorOf(Props(new Node()), name = "c4")

Why are you creating a new ActorSystem, and why are you creating top-level actors inside that actor system?

If you need access to the actor's system, you simple call:

context.system

And you should avoid creating top-level actors "just because", for the same reason that you shouldn't clutter the root of your file system by placing all your files there. To create child-actors to Network, just do:

context.actorOf(...)

Right now you'll have a problem as soon as you create more than one Network-actor in the same system, as it will attempt to create top-level actors of the same name.

like image 20
Viktor Klang Avatar answered Nov 15 '22 07:11

Viktor Klang