Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

akka: pattern for combining messages from multiple children

Here's the pattern I have come across:

An actor A has multiple children C1, ..., Cn. On receiving a message, A sends it to each of its children, which each do some calculation on the message, and on completion send it back to A. A would then like to combine the results of all the children to pass onto another actor.

What would a solution for this problem look like? Or is this an anti-pattern? In which case how should this problem be approached?

Here is a trivial example which hopefully illustrates my current solution. My concerns are that is duplicates code (up to symmetry); does not extend very well to 'lots' of children; and makes it quite hard to see what's going on.

import akka.actor.{Props, Actor}

case class Tagged[T](value: T, id: Int)

class A extends Actor {
  import C1._
  import C2._

  val c1 = context.actorOf(Props[C1], "C1")
  val c2 = context.actorOf(Props[C2], "C2")
  var uid = 0
  var c1Results = Map[Int, Int]()
  var c2Results = Map[Int, Int]()

  def receive = {
    case n: Int => {
      c1 ! Tagged(n, uid)
      c2 ! Tagged(n, uid)
      uid += 1
    }
    case Tagged(C1Result(n), id) => c2Results get id match {
      case None => c1Results += (id -> n)
      case Some(m) => {
        c2Results -= id
        context.parent ! (n, m)
      }
    }
    case Tagged(C2Result(n), id) => c1Results get id match {
      case None => c2Results += (id -> n)
      case Some(m) => {
        c1Results -= id
        context.parent ! (m, n)
      }
    }
  }
}

class C1 extends Actor {
  import C1._

  def receive = {
    case Tagged(n: Int, id) => Tagged(C1Result(n), id)
  }
}

object C1 {
  case class C1Result(n: Int)
}

class C2 extends Actor {
  import C2._

  def receive = {
    case Tagged(n: Int, id) => Tagged(C2Result(n), id)
  }
}    

object C2 {
  case class C2Result(n: Int)
}

If you think the code looks god-awful, take it easy on me, I've just started learning akka ;)

like image 759
Mullefa Avatar asked Jun 11 '15 21:06

Mullefa


People also ask

How does Akka handle concurrency?

Akka's approach to handling concurrency is based on the Actor Model. In an actor-based system, everything is an actor, in much the same way that everything is an object in object-oriented design.

What is alternative to Akka?

Spring, Scala, Erlang, Kafka, and Spring Boot are the most popular alternatives and competitors to Akka.

What is ActorRef in Akka?

java.io.Serializable. An ActorRef is the identity or address of an Actor instance. It is valid only during the Actor’s lifetime and allows messages to be sent to that Actor instance.

What is Akka typed?

Akka “Typed Actors”, now replaced by Akka Typed, were an implementation of the Active Objects pattern. Essentially turning method invocations into asynchronous dispatch instead of synchronous that has been the default way since Smalltalk came out.


1 Answers

In the case of many - or a varying number of - child actors, the ask pattern suggested by Zim-Zam will quickly get out of hand.

The aggregator pattern is designed to help with this kind of situation. It provides an Aggregator trait that you can use in an actor to perform your aggregation logic.

A client actor wanting to perform an aggregation can start an Aggregator based actor instance and send it a message that will kick off the aggregation process.

A new aggregator should be created for each aggregation operation and terminate on sending back the result (when it has received all responses or on a timeout).

An example of this pattern to sum integer values held by the actors represented by the Child class is listed below. (Note that there is no need for them to all be children supervised by the same parent actor: the SummationAggregator just needs a collection of ActorRefs.)

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

import akka.actor._
import akka.contrib.pattern.Aggregator

object Child {
  def props(value: Int): Props = Props(new Child(value))

  case object GetValue
  case class GetValueResult(value: Int)
}

class Child(value: Int) extends Actor {
  import Child._

  def receive = { case GetValue => sender ! GetValueResult(value) }
}

object SummationAggregator {
  def props = Props(new SummationAggregator)

  case object TimedOut
  case class StartAggregation(targets: Seq[ActorRef])
  case object BadCommand
  case class AggregationResult(sum: Int)
}

class SummationAggregator extends Actor with Aggregator {
  import Child._
  import SummationAggregator._

  expectOnce {
    case StartAggregation(targets) =>
      // Could do what this handler does in line but handing off to a 
      // separate class encapsulates the state a little more cleanly
      new Handler(targets, sender())
    case _ =>
      sender ! BadCommand
      context stop self
  }

  class Handler(targets: Seq[ActorRef], originalSender: ActorRef) {
    // Could just store a running total and keep track of the number of responses 
    // that we are awaiting...
    var valueResults = Set.empty[GetValueResult]

    context.system.scheduler.scheduleOnce(1.second, self, TimedOut)

    expect {
      case TimedOut =>
        // It might make sense to respond with what we have so far if some responses are still awaited...
        respondIfDone(respondAnyway = true)
    }

    if (targets.isEmpty)
      respondIfDone()
    else
      targets.foreach { t =>
        t ! GetValue
        expectOnce {
          case vr: GetValueResult =>
            valueResults += vr
            respondIfDone()
        }
      }

    def respondIfDone(respondAnyway: Boolean = false) = {
      if (respondAnyway || valueResults.size == targets.size) {
        originalSender ! AggregationResult(valueResults.foldLeft(0) { case (acc, GetValueResult(v)) => acc + v })
        context stop self
      }
    }
  }
}

To use this SummationAggregator from your parent actor you could do:

context.actorOf(SummationAggregator.props) ! StartAggregation(children)

and then handle AggregationResult somewhere in the parent's receive.

like image 130
Jeremy Stone Avatar answered Nov 16 '22 04:11

Jeremy Stone