I read in the Akka docs that it's dangerous to close over variables from an enclosing actor.
Warning
In this case you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods on the enclosing actor from within the anonymous Actor class. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the other actor’s code will be scheduled concurrently to the enclosing actor.
Now, I have two actors, one of which requests something from the second and does something with the result. In this example below I have put together, actor Accumulator retrieves numbers from actor NumberGenerator and adds them up, reporting the sum along the way.
This can be done in at least two different ways as this example shows with two different receive functions (A vs B). The difference between the two is that A does not close over the counter variable; instead it awaits an integer and sums it up, while B creates a Future that closes over counter and does the sum. This happens within an anonymous actor created just to process onSuccess, if I understand properly how this works.
import com.esotericsoftware.minlog.Log
import akka.actor.{Actor, Props}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import akka.util.duration._
case object Start
case object Request
object ActorTest {
var wake = 0
val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")
Log.info("ActorTest", "Starting !")
accRef ! Start
}
class Accumulator extends Actor {
var counter = 0
implicit val timeout = Timeout(5 seconds)
// A: WITHOUT CLOSURE
def receive = {
case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
}
// B: WITH CLOSURE
def receive = {
case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess {
case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
}
}
}
class NumberGenerator extends Actor {
val rand = new java.util.Random()
def receive = {
case Request => sender ! rand.nextInt(11)-5
}
}
Is it absolutely evil to use closures in this case ? Of course I could use an AtomicInteger instead of an Int, or in some networking scenario using, say, netty, issue a write operation on a threadsafe channel, but this is not my point here.
To the risk of asking the ridiculous: is there a way for the Future's onSuccess to execute in this actor instead of an anonymous middle actor, without defining a case in the receive function ?
EDIT
To put it more clearly, my question is: Is there a way to force a series of Futures to run in the same thread as a given Actor ?
Introduction. In the Scala Standard Library, a Future is a data structure used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) or asynchronously (non-blocking). To be able to use this from Java, Akka provides a java friendly interface in akka.
The good news is that Akka actors conceptually each have their own light-weight thread, which is completely shielded from the rest of the system. This means that instead of having to synchronize access using locks you can write your actor code without worrying about concurrency at all.
A Future is a placeholder object for a value that may not yet exist. Generally, the value of the Future is supplied concurrently and can subsequently be used. Composing concurrent tasks in this way tends to result in faster, asynchronous, non-blocking parallel code.
What is an Actor in Akka? An actor is essentially nothing more than an object that receives messages and takes actions to handle them. It is decoupled from the source of the message and its only responsibility is to properly recognize the type of message it has received and take action accordingly.
The problem is that the onSuccess
is going to run in a different thread than the thread the actor's receive
is going to run in. You could use the pipeTo
approach, or use an Agent. Making counter
an AtomicInteger
would solve the problem, but it's not so clean - that is, it breaks the Actor model.
The easiest way of implementing such design is by using "fire-and-forget" semantic:
class Accumulator extends Actor {
private[this] var counter = 0
def receive = {
case Start => ActorTest.genRef ! Request
case x: Int => {
counter += x
Log.info("Accumulator", "counter = " + counter)
self ! Start
}
}
}
This solution is fully asynchronous, and you don't need any timeout.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With