Is this code thread-safe?
Observable<String> observable = ... // some observable that calls
// onNext from a background thread
observable
.scan(new ArrayList<String>(), (List<String> acc, String next) -> {
acc.add(next);
return acc;
})
.subscribe( list -> {
// do somethind with sequence of lists
...
});
I'm curious because ArrayList is not a thread-safe data structure.
TL;TR: most of RxJava Operators and Subjects are NOT thread safe.
RxJava: Multi-Threading in Android.
By default, Rx is single-threaded which implies that an Observable and the chain of operators that we can apply to it will notify its observers on the same thread on which its subscribe() method is called.
As a quick answer, in .NET (the original Rx implementation) all values from an observable sequence can be assumed to be sequential. This does not preclude it to be multi-threaded. However if you are producing values in a multi-threaded manner, then you may want enforce the sequential nature by looking for the equivalent function to the .NET Synchronize()
Rx operator.
Another option is to check the implementation of Scan
in the RxJava source code, to validate that it does enforce the sequential nature you would want/expect to provide you safety in your accumulator function.
If this code isn't thread-safe, then either RxJava is broken or your Observable source is broken - operators being non-reentrant is part of the Rx contract.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With