Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Avoiding overlapping OnNext calls in Rx when using SubscribeOn(Scheduler.TaskPool)

I have some code using Rx, called from multiple threads that does:

subject.OnNext(value); // where subject is Subject<T>

I want the values to be processed in the background, so my subscription is

subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value =>
{
    // use value
});

I don't really care which threads handle values coming out of the Observable, as long as the work is put into the TaskPool and doesn't block the current thread. However, my use of 'value' inside my OnNext delegate is not thread safe. At the moment, if a lot of values are going through the Observable I'm getting overlapping calls to my OnNext handler.

I could just add a lock to my OnNext delegate, but that doesn't feel like the Rx way of doing things. What's the best way to make sure I only have one call to my OnNext handler at a time, when I have multiple threads calling subject.OnNext(value);?

like image 291
Wilka Avatar asked Feb 06 '12 12:02

Wilka


3 Answers

From Using Subjects on MSDN

By default, subjects do not perform any synchronization across threads. [...] If, however, you want to synchronize outgoing calls to observers using a scheduler, you can use the Synchronize method to do so.

So you should, as Brandon says in the comments, synchronize the subject and hand that out to your producer threads. e.g.

var syncSubject = Subject.Synchronize(subject);

// syncSubject.OnNext(value) can be used from multiple threads

subscription = syncSubject.ObserveOn(TaskPoolScheduler.Default).Subscribe(value =>
{
    // use value
});
like image 173
Wilka Avatar answered Nov 01 '22 02:11

Wilka


I think you are looking for the .Synchronize() extension method. To get performance improvements in a recent release (late 2011) they Rx team relaxed the assumptions about the sequential nature of observable sequence producers. However it seems you break these assumptions (not a bad thing) but to get Rx back playing as users would expect, you should Synchronize the sequence to ensure it is sequential again.

like image 42
Lee Campbell Avatar answered Nov 01 '22 01:11

Lee Campbell


Here a bit more explanation why to use Synchronize (the second paragraph). From the other side Synchronize may take part in deadlock if you actively use locking in your code, at least I witnessed such situation.

like image 2
pavel.baravik Avatar answered Nov 01 '22 00:11

pavel.baravik