Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it safe to pass an RX Observable to an actor (scala)?

I've been using scala bindings for RX Java for some time now, and am thinking about combining this with Akka Actors. I would like to know if it's safe/possible to pass RX Observables between Akka Actors. For example, a program to print squares of even integers up to 20 (every second):

/* producer creates an observable and sends it to the worker */
object Producer extends Actor {
  val toTwenty : Observable[Int] = Observable.interval(1 second).take(20)

  def receive = {
    case o : Observable[Int] =>
      o.subscribe( onNext => println )
  }

  worker ! toTwenty
}


/* worker which returns squares of even numbers */
object Worker extends Actor {
  def receive = {
    case o : Observable[Int] => 
       sender ! o filter { _ % 2 == 0 } map { _^2 }
  }
}

(Please treat this as pseudo-code; It doesn't compile). Note I am sending Observables from one actor to another. I'd like to understand :

  • Would Akka and RX synchronize access to the Observable automatically?
  • The Observable can't be sent over a distributed system - it's a reference to an object in local memory. However, would it work locally?
  • Suppose in this trivial example, work would be scheduled on the subscribe call in the Producer. Could I split the work so that it was done on each actor separately?

Digression : I've seen some projects that look to combine RX and Actors:

http://jmhofer.johoop.de/?p=507 and https://github.com/jmhofer/rxjava-akka

But these are different in that they don't simply pass the Observable as a message between actors. They first call subscribe() to get the values , then send these to an actors mailbox, and create a new Observable out of this. Or am I mistaken?

like image 623
Luciano Avatar asked Mar 01 '14 16:03

Luciano


1 Answers

Your approach is not a good idea. The main idea behind Akka is that messages are sent to the mailbox of an actor and the actor processes them sequentially (on one Thread). This way it is not possible that 2 Threads access the state of an actor and no concurrency issues can arise.

In your case, you use subscribe on the Observable. Your onNext callback will likely execute on another Thread. Therefore it is suddenly possible that 2 Threads can access the state of your actor. So you have to be really cautious what you do within your callback. This is the reason for your last observation of other implementations. Those implementations seem to grab the value within onNext and send this value as a message. You must not change the internal state of an actor within such a callback. Send a message to the same actor instead. This way sequential processing on one thread is guaranteed again.

like image 107
mavilein Avatar answered Nov 24 '22 01:11

mavilein