Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Actors - Creating Pool of Actors

Tags:

scala

akka

actor

I created the following Akka Actor code in Scala. Code works fine when a single workerActor is created. But code silently fails when I try to create a pool of worker actors using round robin logic. Any idea how to fix this? How do I get more debug info to be printed?

import scala.collection.immutable.Map
import scala.collection.mutable.ArrayBuffer

import akka.actor.actorRef2Scala
import akka.actor.ActorSystem
import akka.actor.Props
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask
import akka.util.Timeout
import akka.actor._
import org.junit._
import org.junit.Assert._
import messaging.actors._
import akka.routing.RoundRobinRouter
import akka.routing._

class MainEngineActorTest {

  @Test
  def testMainActor () = {
  val _system = ActorSystem("MainEngineActor")
  val master = _system.actorOf(Props[MainEngineActor], name = "EngineActor")

  println ("Created Main Engine Actor")


  implicit val timeout = Timeout(5 seconds)

  val userID = new UserID ("test1")

  println ("Sending messages")

  for (i <- ( 1 to 10)) {
      master ! "Hello"
      master ! "World"
  }

  }
}

class MainEngineActor extends Actor with ActorLogging{

  // works if we create only a single workerActor
  //val workerActors = context.actorOf(Props[WorkerActor], name = "WorkerActors")

  // Doesn't work when we create a pool of worker actors - how do we fix this? 
  // why doesn't this work and why aren't any error messages printed?
  val workerActors = context.actorOf(RoundRobinPool(5).props(Props[WorkerActor]), name = "WorkerActors")

   def receive: Receive = {     
     case request => {       
       workerActors forward request
     }    
  }
}  

class WorkerActor extends Actor {

   def receive: Receive = {               
     case request => {
       println ("RequestReceived =" + request)
     }
   }
}
like image 631
user3482479 Avatar asked Jan 26 '26 14:01

user3482479


1 Answers

Try creating your pool like this instead:

val workerActors = context.actorOf(Props[WorkerActor].withRouter(RoundRobinPool(5)), name = "WorkerActors")

In addition, when running this as a Junit test, the program is terminating before the child actors have a chance to receive the message. I verified this by adding a Thread.sleep(5000) after the loop that is sending the Hello and World messages to master. I then tweaked your code a little bit to use Akka's TestActorRef from akka-testkit which will force everything to use the CallingThreadDispatcher to get synchronous execution throughout the test and everything works as expected. The two lines I changed are:

implicit val _system = ActorSystem("MainEngineActor")
val master = TestActorRef(new MainEngineActor())
like image 135
cmbaxter Avatar answered Jan 29 '26 02:01

cmbaxter