With the Rx Subject
, is it thread-safe to call OnNext()
from multiple threads?
So the sequence can be generated from multiple sources.
Will merge do the same thing?
TL;TR: most of RxJava Operators and Subjects are NOT thread safe.
Thread safety becomes a concern if there is at least a single entry point which can be accessed by multiple threads. If a piece of code is accessed by multiple threads and is calling other method/class/etc., then all this code tree becomes vulnerable.
Thread safety is a computer programming concept applicable to multi-threaded code. Thread-safe code only manipulates shared data structures in a manner that ensures that all threads behave properly and fulfill their design specifications without unintended interaction.
The Rx contract requires that notifications be sequential, and is a logical necessity for several operators. That said, you can use the available Synchronize
methods to get this behaviour.
var subject = new Subject<int>();
var syncedSubject = Subject.Synchronize(subject);
You can now make concurrent calls to syncedSubject
.
For an observer which must be synchronized, you can also use:
var observer = Observer.Create<Unit>(...);
var syncedObserver = Observer.Synchronize(observer);
Test:
Func<int, Action> onNext = i => () => syncedSubject.OnNext(i);
Parallel.Invoke
(
onNext(1),
onNext(2),
onNext(3),
onNext(4)
);
No, sequences are meant to be sequential, hence no overlapping notifications are allowed. You can use Synchronize extension methods to enforce proper synchronization. Operators like Merge take a lock to call the downstream observer in order to ensure proper serial invocation on On* callbacks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With