I'm new to akka and I would like to run a list akka actors at once for a list of messages, for example (I'm using scala here):
input
message_1
message_2
message_3
output for each message:
1
2
3
Would run 3 akka actors at once passing each parameter from list to each of them, I would like to use those results later as a List
of answers. Ideally I'm looking for operation that would give me following:
runActorsForListAndWaitForAnswer(messagesList).map(_.toInt()).sum
I don't know if such method exists, but it would be helpful. Any help is appreciated, thanks!
First of all you have to abandon imperative, blocking mind-set. Akka tastes best when everything is asynchronous. You should never block, waiting for a reply of another actor. Instead you can take Future[T]
object and apply a function on it. This function will be invoked when future is done.
Let's make an example: you have a SquareActor
that takes an Int
and returns a square of it. The correct way of asking it for a reply is:
squareActor ? 9 map {result =>
println(result) //81
}
//more code
Very important: code block with println
won't block. More code will execute immediately and your callback method will be invoked some time later.
That being said we can start implementing your use case. If I understand correctly you have a list of actors and a list of integers. You want to send every integer to exactly one actor. Here is a pseudo code:
val actors: List[ActorRef] = //...
val numbers: List[Int] = //...
val actorsWithArguments: List[(ActorRef, Int)] = actors zip numbers
val futuresOfAny: List[Future[Any]] = actorsWithArguments map { case (actor, number) => actor ? number}
val futures: List[Future[Int]] = futuresOfAny map {_.mapTo[Int]}
val futureOfAllResults: Future[List[Int]] = Future.sequence(futures)
val future: Future[Int] = futureOfAllResults map { _.sum}
I left explicit types intentionally to help you follow the code. Let's go step by step:
actorsWithArguments
is a List
of tuples. Each item holds a pair of actor and a message - (actor1, message1)
, (actor2, message2)
, ...
futuresOfAny
contains a list of Future
results of actor ? number
invocation. ?
returns a Future[Any]
because: a) result is not (yet) known and b) Akka doesn't know what the type of result will be (reply message)
futures
is strongly typed since we know that each reply is an Int
. We mapTo
each Future[Any]
to Future[Int]
futureOfAllResults
employs really awesome transformation from a List[Future[Int]]
to Future[List[Int]]
. In other words we just turned list of future results into a single future result with all items.
future
holds (will hold in the future) your result.
If you think it's a lot of code, it was just for education purposes. Chaining and type inference can do magic:
val future = Future.sequence(actors zip numbers map { case (actor, number) => actor ? number} map { _.mapTo[Int]}) map { _.sum} }
Finally you got your result. Well, you will get in the future. Now if you want send that result back to another actor you say:
future map {sum => otherActor ! sum}
//or even...
future map otherActor.!
I strongly encourage you to read chapter on Futures in the official documentation to make this all clearer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With