I've been using scala bindings for RX Java for some time now, and am thinking about combining this with Akka Actors. I would like to know if it's safe/possible to pass RX Observable
s between Akka Actor
s. For example, a program to print squares of even integers up to 20 (every second):
/* producer creates an observable and sends it to the worker */
object Producer extends Actor {
val toTwenty : Observable[Int] = Observable.interval(1 second).take(20)
def receive = {
case o : Observable[Int] =>
o.subscribe( onNext => println )
}
worker ! toTwenty
}
/* worker which returns squares of even numbers */
object Worker extends Actor {
def receive = {
case o : Observable[Int] =>
sender ! o filter { _ % 2 == 0 } map { _^2 }
}
}
(Please treat this as pseudo-code; It doesn't compile). Note I am send
ing Observables from one actor to another. I'd like to understand :
Observable
can't be sent over a distributed system - it's a reference to an object in local memory. However, would it work locally?subscribe
call in the Producer
. Could I split the work so that it was done on each actor separately?Digression : I've seen some projects that look to combine RX and Actors:
http://jmhofer.johoop.de/?p=507 and https://github.com/jmhofer/rxjava-akka
But these are different in that they don't simply pass the Observable
as a message between actors. They first call subscribe()
to get the values , then send these to an actors mailbox, and create a new Observable
out of this. Or am I mistaken?
Your approach is not a good idea. The main idea behind Akka is that messages are sent to the mailbox of an actor and the actor processes them sequentially (on one Thread). This way it is not possible that 2 Threads access the state of an actor and no concurrency issues can arise.
In your case, you use subscribe on the Observable. Your onNext callback will likely execute on another Thread. Therefore it is suddenly possible that 2 Threads can access the state of your actor. So you have to be really cautious what you do within your callback. This is the reason for your last observation of other implementations. Those implementations seem to grab the value within onNext and send this value as a message. You must not change the internal state of an actor within such a callback. Send a message to the same actor instead. This way sequential processing on one thread is guaranteed again.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With