Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava thread-safety

Is this code thread-safe?

Observable<String> observable = ... // some observable that calls
                                    // onNext from a background thread

observable
  .scan(new ArrayList<String>(), (List<String> acc, String next) -> {
    acc.add(next);
    return acc;
  })
  .subscribe( list -> {
    // do somethind with sequence of lists
    ...
  });

I'm curious because ArrayList is not a thread-safe data structure.

like image 931
ZhekaKozlov Avatar asked Sep 29 '13 17:09

ZhekaKozlov


People also ask

Is RxJava thread safe?

TL;TR: most of RxJava Operators and Subjects are NOT thread safe.

Is RxJava multithreaded?

RxJava: Multi-Threading in Android.

Is RxJava single threaded?

By default, Rx is single-threaded which implies that an Observable and the chain of operators that we can apply to it will notify its observers on the same thread on which its subscribe() method is called.


2 Answers

As a quick answer, in .NET (the original Rx implementation) all values from an observable sequence can be assumed to be sequential. This does not preclude it to be multi-threaded. However if you are producing values in a multi-threaded manner, then you may want enforce the sequential nature by looking for the equivalent function to the .NET Synchronize() Rx operator.

Another option is to check the implementation of Scan in the RxJava source code, to validate that it does enforce the sequential nature you would want/expect to provide you safety in your accumulator function.

like image 143
Lee Campbell Avatar answered Nov 15 '22 22:11

Lee Campbell


If this code isn't thread-safe, then either RxJava is broken or your Observable source is broken - operators being non-reentrant is part of the Rx contract.

like image 43
Ana Betts Avatar answered Nov 15 '22 22:11

Ana Betts