Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka actors, Futures, and closures

I read in the Akka docs that it's dangerous to close over variables from an enclosing actor.

Warning

In this case you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods on the enclosing actor from within the anonymous Actor class. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the other actor’s code will be scheduled concurrently to the enclosing actor.

Now, I have two actors, one of which requests something from the second and does something with the result. In this example below I have put together, actor Accumulator retrieves numbers from actor NumberGenerator and adds them up, reporting the sum along the way.

This can be done in at least two different ways as this example shows with two different receive functions (A vs B). The difference between the two is that A does not close over the counter variable; instead it awaits an integer and sums it up, while B creates a Future that closes over counter and does the sum. This happens within an anonymous actor created just to process onSuccess, if I understand properly how this works.

import com.esotericsoftware.minlog.Log

import akka.actor.{Actor, Props}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import akka.util.duration._

case object Start
case object Request


object ActorTest {
  var wake = 0

  val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
  val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")

  Log.info("ActorTest", "Starting !")

  accRef ! Start
}

class Accumulator extends Actor {
  var counter = 0

  implicit val timeout = Timeout(5 seconds)

  // A: WITHOUT CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
    case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
  }
  // B: WITH CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess {
      case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
    }
  }
}

class NumberGenerator extends Actor {
  val rand = new java.util.Random()

  def receive = {
    case Request => sender ! rand.nextInt(11)-5
  }
}

Is it absolutely evil to use closures in this case ? Of course I could use an AtomicInteger instead of an Int, or in some networking scenario using, say, netty, issue a write operation on a threadsafe channel, but this is not my point here.

To the risk of asking the ridiculous: is there a way for the Future's onSuccess to execute in this actor instead of an anonymous middle actor, without defining a case in the receive function ?

EDIT

To put it more clearly, my question is: Is there a way to force a series of Futures to run in the same thread as a given Actor ?

like image 874
gsimard Avatar asked Jun 21 '12 00:06

gsimard


People also ask

What is Futures in Akka?

Introduction. In the Scala Standard Library, a Future is a data structure used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) or asynchronously (non-blocking). To be able to use this from Java, Akka provides a java friendly interface in akka.

Is Akka actor a thread?

The good news is that Akka actors conceptually each have their own light-weight thread, which is completely shielded from the rest of the system. This means that instead of having to synchronize access using locks you can write your actor code without worrying about concurrency at all.

What are Futures in Scala?

A Future is a placeholder object for a value that may not yet exist. Generally, the value of the Future is supplied concurrently and can subsequently be used. Composing concurrent tasks in this way tends to result in faster, asynchronous, non-blocking parallel code.

How do actors work in Akka?

What is an Actor in Akka? An actor is essentially nothing more than an object that receives messages and takes actions to handle them. It is decoupled from the source of the message and its only responsibility is to properly recognize the type of message it has received and take action accordingly.


2 Answers

The problem is that the onSuccess is going to run in a different thread than the thread the actor's receive is going to run in. You could use the pipeTo approach, or use an Agent. Making counter an AtomicInteger would solve the problem, but it's not so clean - that is, it breaks the Actor model.

like image 179
sourcedelica Avatar answered Sep 21 '22 17:09

sourcedelica


The easiest way of implementing such design is by using "fire-and-forget" semantic:

class Accumulator extends Actor {
  private[this] var counter = 0

  def receive = {
    case Start => ActorTest.genRef ! Request
    case x: Int => {
      counter += x
      Log.info("Accumulator", "counter = " + counter)
      self ! Start
    }
  }
}

This solution is fully asynchronous, and you don't need any timeout.

like image 21
paradigmatic Avatar answered Sep 20 '22 17:09

paradigmatic