I have some code using Rx, called from multiple threads that does:
subject.OnNext(value); // where subject is Subject<T>
I want the values to be processed in the background, so my subscription is
subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value =>
{
// use value
});
I don't really care which threads handle values coming out of the Observable, as long as the work is put into the TaskPool and doesn't block the current thread. However, my use of 'value' inside my OnNext delegate is not thread safe. At the moment, if a lot of values are going through the Observable I'm getting overlapping calls to my OnNext handler.
I could just add a lock to my OnNext delegate, but that doesn't feel like the Rx way of doing things. What's the best way to make sure I only have one call to my OnNext handler at a time, when I have multiple threads calling subject.OnNext(value);
?
From Using Subjects on MSDN
By default, subjects do not perform any synchronization across threads. [...] If, however, you want to synchronize outgoing calls to observers using a scheduler, you can use the Synchronize method to do so.
So you should, as Brandon says in the comments, synchronize the subject and hand that out to your producer threads. e.g.
var syncSubject = Subject.Synchronize(subject);
// syncSubject.OnNext(value) can be used from multiple threads
subscription = syncSubject.ObserveOn(TaskPoolScheduler.Default).Subscribe(value =>
{
// use value
});
I think you are looking for the .Synchronize() extension method. To get performance improvements in a recent release (late 2011) they Rx team relaxed the assumptions about the sequential nature of observable sequence producers. However it seems you break these assumptions (not a bad thing) but to get Rx back playing as users would expect, you should Synchronize the sequence to ensure it is sequential again.
Here a bit more explanation why to use Synchronize (the second paragraph). From the other side Synchronize may take part in deadlock if you actively use locking in your code, at least I witnessed such situation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With