Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Waiting for multiple results in Akka

What is the proper way to wait for the result of multiple actors in Akka?

The Principles of Reactive Programming Coursera course had an exercise with a replicated key-value store. Without going into the details of the assignment, it required waiting on the acknowledgement of multiple actors before it could indicate the replication was complete.

I implemented the assignment using a mutable map containing the outstanding requests, but I felt the solution had a 'bad smell'. I hoped there was a better way to implement what seems like a common scenario.

In an attempt to uphold the classes' honor code by withholding my solution to the exercise, I have an abstract use case that describes a similar problem.

An invoice line item needs to calculate its tax liability. The tax liability is combination of all the taxes applied to the line item across multiple taxing authorities (e.g., federal, state, police district). If each taxing authority was an actor capable of determining the tax liability of the line item, the line item would need all actors to report before it could continue report the overall tax liability. What is the best/right way to accomplish this scenario in Akka?

like image 997
Kevin Avatar asked Mar 31 '14 19:03

Kevin


People also ask

How does Akka handle concurrency?

Akka's approach to handling concurrency is based on the Actor Model. In an actor-based system, everything is an actor, in much the same way that everything is an object in object-oriented design.

Is Akka single threaded?

In Akka, actors are guaranteed to be run in a single-threaded illusion, which means that the Akka framework takes care of threading issues while allowing us to focus on the behavior that needs to be implemented. Actors may only communicate with each other and the outside world by through messages.

Does Akka use event loop?

Threading Akka Actors are similar to Verticles in Vert. x in the sense that both are the unit of execution. Similar to Actors, Verticles are running on the event loop by default, which is essentially a thread pool that shall not be blocked.

What is Akka protocol?

Interacting with an Actor in Akka is done through an ActorRef[T] where T is the type of messages the actor accepts, also known as the “protocol”. This ensures that only the right kind of messages can be sent to an actor and also that no one else but the Actor itself can access the Actor instance internals.


2 Answers

Here is a simplified example of what I believe you are looking for. It shows how a master like actor spawns some child workers and then waits for all of their responses, handling the situation where a timeout can occur waiting for results. The solution shows how to wait for an initial request and then switch over to a new receive function when waiting for the responses. It also shows how to propagate state into the waiting receive function to avoid having to have explicit mutable state at the instance level.

object TaxCalculator {
  sealed trait TaxType
  case object StateTax extends TaxType
  case object FederalTax extends TaxType
  case object PoliceDistrictTax extends TaxType
  val AllTaxTypes:Set[TaxType] = Set(StateTax, FederalTax, PoliceDistrictTax)

  case class GetTaxAmount(grossEarnings:Double)
  case class TaxResult(taxType:TaxType, amount:Double)  

  case class TotalTaxResult(taxAmount:Double)
  case object TaxCalculationTimeout
}

class TaxCalculator extends Actor{
 import TaxCalculator._
 import context._
 import concurrent.duration._

  def receive =  waitingForRequest

  def waitingForRequest:Receive = {
    case gta:GetTaxAmount =>
      val children = AllTaxTypes map (tt => actorOf(propsFor(tt)))
      children foreach (_ ! gta)
      setReceiveTimeout(2 seconds)
      become(waitingForResponses(sender, AllTaxTypes))
  }

  def waitingForResponses(respondTo:ActorRef, expectedTypes:Set[TaxType], taxes:Map[TaxType, Double] = Map.empty):Receive = {
    case TaxResult(tt, amount) =>
      val newTaxes = taxes ++ Map(tt -> amount)
      if (newTaxes.keySet == expectedTypes){
        respondTo ! TotalTaxResult(newTaxes.values.foldLeft(0.0)(_+_))
        context stop self
      }
      else{
        become(waitingForResponses(respondTo, expectedTypes, newTaxes))
      }

    case ReceiveTimeout =>
      respondTo ! TaxCalculationTimeout
      context stop self
  }

  def propsFor(taxType:TaxType) = taxType match{
    case StateTax => Props[StateTaxCalculator]
    case FederalTax => Props[FederalTaxCalculator]
    case PoliceDistrictTax => Props[PoliceDistrictTaxCalculator]
  }  
}

trait TaxCalculatingActor extends Actor{  
  import TaxCalculator._
  val taxType:TaxType
  val percentage:Double

  def receive = {
    case GetTaxAmount(earnings) => 
      val tax = earnings * percentage
      sender ! TaxResult(taxType, tax)
  }
}

class FederalTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.FederalTax
  val percentage = 0.20
}

class StateTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.StateTax
  val percentage = 0.10
}

class PoliceDistrictTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.PoliceDistrictTax
  val percentage = 0.05
}

Then you could test this out with the following code:

import TaxCalculator._
import akka.pattern.ask
import concurrent.duration._
implicit val timeout = Timeout(5 seconds)

val system = ActorSystem("taxes")
import system._
val cal = system.actorOf(Props[TaxCalculator])
val fut = cal ? GetTaxAmount(1000.00)
fut onComplete{
  case util.Success(TotalTaxResult(amount)) =>
    println(s"Got tax total of $amount")
  case util.Success(TaxCalculationTimeout) =>
    println("Got timeout calculating tax")
  case util.Failure(ex) => 
    println(s"Got exception calculating tax: ${ex.getMessage}")
}
like image 86
cmbaxter Avatar answered Sep 19 '22 11:09

cmbaxter


This is a very common problem in Akka. You have multiple actors that will do the job for you and you need to combine them.

Solution proposed by Jammie Allen in his book "Effective Akka" (it was about getting bank account balance from various types of accounts) is that you spawn an actor that will spawn multiple actors that will do the job (e.g. calculate you tax). And it will wait for all of them to answer.

One catch that you should not use ask but insted tell.

When you spaw your multiple actors (e.g. FederalTaxactor, StateTaxActor...) you send them a message with the data they need to process. Then you know how many answers you need to collect. With every response you check if all responses are there. If not you wait.

The problem is that you might wait forever if any of the actors fail. So you schedule a timeout message to yourself. If not all answers are there you return that the operation did not complete successfully.

Akka has a special utility for scheduling a timeout to yourself available as a nice helper.

like image 21
almendar Avatar answered Sep 20 '22 11:09

almendar