What is the proper way to wait for the result of multiple actors in Akka?
The Principles of Reactive Programming Coursera course had an exercise with a replicated key-value store. Without going into the details of the assignment, it required waiting on the acknowledgement of multiple actors before it could indicate the replication was complete.
I implemented the assignment using a mutable map containing the outstanding requests, but I felt the solution had a 'bad smell'. I hoped there was a better way to implement what seems like a common scenario.
In an attempt to uphold the classes' honor code by withholding my solution to the exercise, I have an abstract use case that describes a similar problem.
An invoice line item needs to calculate its tax liability. The tax liability is combination of all the taxes applied to the line item across multiple taxing authorities (e.g., federal, state, police district). If each taxing authority was an actor capable of determining the tax liability of the line item, the line item would need all actors to report before it could continue report the overall tax liability. What is the best/right way to accomplish this scenario in Akka?
Akka's approach to handling concurrency is based on the Actor Model. In an actor-based system, everything is an actor, in much the same way that everything is an object in object-oriented design.
In Akka, actors are guaranteed to be run in a single-threaded illusion, which means that the Akka framework takes care of threading issues while allowing us to focus on the behavior that needs to be implemented. Actors may only communicate with each other and the outside world by through messages.
Threading Akka Actors are similar to Verticles in Vert. x in the sense that both are the unit of execution. Similar to Actors, Verticles are running on the event loop by default, which is essentially a thread pool that shall not be blocked.
Interacting with an Actor in Akka is done through an ActorRef[T] where T is the type of messages the actor accepts, also known as the “protocol”. This ensures that only the right kind of messages can be sent to an actor and also that no one else but the Actor itself can access the Actor instance internals.
Here is a simplified example of what I believe you are looking for. It shows how a master like actor spawns some child workers and then waits for all of their responses, handling the situation where a timeout can occur waiting for results. The solution shows how to wait for an initial request and then switch over to a new receive function when waiting for the responses. It also shows how to propagate state into the waiting receive function to avoid having to have explicit mutable state at the instance level.
object TaxCalculator {
sealed trait TaxType
case object StateTax extends TaxType
case object FederalTax extends TaxType
case object PoliceDistrictTax extends TaxType
val AllTaxTypes:Set[TaxType] = Set(StateTax, FederalTax, PoliceDistrictTax)
case class GetTaxAmount(grossEarnings:Double)
case class TaxResult(taxType:TaxType, amount:Double)
case class TotalTaxResult(taxAmount:Double)
case object TaxCalculationTimeout
}
class TaxCalculator extends Actor{
import TaxCalculator._
import context._
import concurrent.duration._
def receive = waitingForRequest
def waitingForRequest:Receive = {
case gta:GetTaxAmount =>
val children = AllTaxTypes map (tt => actorOf(propsFor(tt)))
children foreach (_ ! gta)
setReceiveTimeout(2 seconds)
become(waitingForResponses(sender, AllTaxTypes))
}
def waitingForResponses(respondTo:ActorRef, expectedTypes:Set[TaxType], taxes:Map[TaxType, Double] = Map.empty):Receive = {
case TaxResult(tt, amount) =>
val newTaxes = taxes ++ Map(tt -> amount)
if (newTaxes.keySet == expectedTypes){
respondTo ! TotalTaxResult(newTaxes.values.foldLeft(0.0)(_+_))
context stop self
}
else{
become(waitingForResponses(respondTo, expectedTypes, newTaxes))
}
case ReceiveTimeout =>
respondTo ! TaxCalculationTimeout
context stop self
}
def propsFor(taxType:TaxType) = taxType match{
case StateTax => Props[StateTaxCalculator]
case FederalTax => Props[FederalTaxCalculator]
case PoliceDistrictTax => Props[PoliceDistrictTaxCalculator]
}
}
trait TaxCalculatingActor extends Actor{
import TaxCalculator._
val taxType:TaxType
val percentage:Double
def receive = {
case GetTaxAmount(earnings) =>
val tax = earnings * percentage
sender ! TaxResult(taxType, tax)
}
}
class FederalTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.FederalTax
val percentage = 0.20
}
class StateTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.StateTax
val percentage = 0.10
}
class PoliceDistrictTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.PoliceDistrictTax
val percentage = 0.05
}
Then you could test this out with the following code:
import TaxCalculator._
import akka.pattern.ask
import concurrent.duration._
implicit val timeout = Timeout(5 seconds)
val system = ActorSystem("taxes")
import system._
val cal = system.actorOf(Props[TaxCalculator])
val fut = cal ? GetTaxAmount(1000.00)
fut onComplete{
case util.Success(TotalTaxResult(amount)) =>
println(s"Got tax total of $amount")
case util.Success(TaxCalculationTimeout) =>
println("Got timeout calculating tax")
case util.Failure(ex) =>
println(s"Got exception calculating tax: ${ex.getMessage}")
}
This is a very common problem in Akka. You have multiple actors that will do the job for you and you need to combine them.
Solution proposed by Jammie Allen in his book "Effective Akka" (it was about getting bank account balance from various types of accounts) is that you spawn an actor that will spawn multiple actors that will do the job (e.g. calculate you tax). And it will wait for all of them to answer.
One catch that you should not use ask
but insted tell
.
When you spaw your multiple actors (e.g. FederalTaxactor, StateTaxActor...) you send them a message with the data they need to process. Then you know how many answers you need to collect. With every response you check if all responses are there. If not you wait.
The problem is that you might wait forever if any of the actors fail. So you schedule a timeout message to yourself. If not all answers are there you return that the operation did not complete successfully.
Akka has a special utility for scheduling a timeout to yourself available as a nice helper.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With