I created a Subject
instance in RxJava and call its onNext()
from multiple threads:
PublishSubject<String> subject = PublishSubject.create();
//...
subject.onNext("A"); //thread A
subject.onNext("B"); //thread B
The RxJava documentation says that:
take care not to call its
onNext( )
method (or its other on methods) from multiple threads, as this could lead to non-serialized calls, which violates the Observable contract and creates an ambiguity in the resultingSubject
.
toSerialized()
on such Subject
assuming I don't care if "A"
goes before or after "B"
? How would serialization help?Subject
thread-safe anyway or will I break RxJava without toSerialized()
?Do I have to call toSerialized() on such Subject assuming I don't care if "A" goes before or after "B"?
Yep use toSerialized()
because all operators applied to the subject assume that proper serialization is occurring upstream. The stream may fail or produce unexpected results if this does not happen.
Is Subject thread-safe anyway or will I break RxJava without toSerialized()?
answered above
What is the "Observable contract" that the documentation mentions?
Rx Design Guidelines.pdf section 4 defines the Observable contract:
4.2. Assume observer instances are called in a serialized fashion
As Rx uses a push model and .NET supports multithreading, it is possible for different messages to arrive different execution contexts at the same time. If consumers of observable sequences would have to deal with this in every place, their code would need to perform a lot of housekeeping to avoid common concurrency problems. Code written in this fashion would be harder to maintain and potentially suffer from performance issues.
I think RxJava documentation should make this more discoverable so I'll raise an issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With