Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Run list of akka actors for list of messages

Tags:

scala

akka

I'm new to akka and I would like to run a list akka actors at once for a list of messages, for example (I'm using scala here):

input

message_1
message_2
message_3

output for each message:

1
2
3

Would run 3 akka actors at once passing each parameter from list to each of them, I would like to use those results later as a List of answers. Ideally I'm looking for operation that would give me following:

runActorsForListAndWaitForAnswer(messagesList).map(_.toInt()).sum

I don't know if such method exists, but it would be helpful. Any help is appreciated, thanks!

like image 955
wlk Avatar asked Dec 12 '22 18:12

wlk


1 Answers

First of all you have to abandon imperative, blocking mind-set. Akka tastes best when everything is asynchronous. You should never block, waiting for a reply of another actor. Instead you can take Future[T] object and apply a function on it. This function will be invoked when future is done.

Let's make an example: you have a SquareActor that takes an Int and returns a square of it. The correct way of asking it for a reply is:

squareActor ? 9 map {result =>
  println(result)  //81
}
//more code

Very important: code block with println won't block. More code will execute immediately and your callback method will be invoked some time later.

That being said we can start implementing your use case. If I understand correctly you have a list of actors and a list of integers. You want to send every integer to exactly one actor. Here is a pseudo code:

val actors: List[ActorRef] = //...
val numbers: List[Int] = //...
val actorsWithArguments: List[(ActorRef, Int)] = actors zip numbers
val futuresOfAny: List[Future[Any]] = actorsWithArguments map { case (actor, number) => actor ? number}
val futures: List[Future[Int]] = futuresOfAny map {_.mapTo[Int]}
val futureOfAllResults: Future[List[Int]] = Future.sequence(futures)
val future: Future[Int] = futureOfAllResults map { _.sum}

I left explicit types intentionally to help you follow the code. Let's go step by step:

  • actorsWithArguments is a List of tuples. Each item holds a pair of actor and a message - (actor1, message1), (actor2, message2), ...

  • futuresOfAny contains a list of Future results of actor ? number invocation. ? returns a Future[Any] because: a) result is not (yet) known and b) Akka doesn't know what the type of result will be (reply message)

  • futures is strongly typed since we know that each reply is an Int. We mapTo each Future[Any] to Future[Int]

  • futureOfAllResults employs really awesome transformation from a List[Future[Int]] to Future[List[Int]]. In other words we just turned list of future results into a single future result with all items.

  • future holds (will hold in the future) your result.

If you think it's a lot of code, it was just for education purposes. Chaining and type inference can do magic:

val future = Future.sequence(actors zip numbers map { case (actor, number) => actor ? number} map { _.mapTo[Int]}) map { _.sum} }

Finally you got your result. Well, you will get in the future. Now if you want send that result back to another actor you say:

future map {sum => otherActor ! sum}
//or even...
future map otherActor.!

I strongly encourage you to read chapter on Futures in the official documentation to make this all clearer.

like image 178
Tomasz Nurkiewicz Avatar answered Dec 14 '22 22:12

Tomasz Nurkiewicz