Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions: Concurrency within the subscriber

I'm trying to wrap my head around Reactive Extensions' support for concurrency and am having a hard time getting the results I'm after. So I may not get it just yet.

I have a source that emits data into the stream faster than the subscriber can consume it. I'd prefer to configure the stream such that another thread is used to invoke the subscriber for each new item from the stream, so that the subscriber has multiple threads running through it concurrently. I am able to ensure the thread-safeness of the subscriber.

The following sample demonstrates the problem:

Observable.Interval( TimeSpan.FromSeconds(1))
    .Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now, 
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(x =>
               {
                   Console.WriteLine("{0} Thread: {1} Observed value: {2}",
                                     DateTime.Now,
                                     Thread.CurrentThread.ManagedThreadId, x);
                   Thread.Sleep(5000); // Simulate long work time
               });

The console output looks like this (dates removed):

4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6

Please notice the "Observed value" time deltas. The subscriber isn't invoked in parallel even though the source continues to emit data faster than the subscriber can process it. While I can imagine a bunch of scenarios where the current behavior would be useful, I need to be able to process the messages as soon as they become available.

I've tried several variations of Schedulers with the ObserveOn method, but none of them seem to do what I want.

Other than spinning off a thread within the Subscribe action to perform the long running work, is there anything I'm missing that will allow for concurrent delivery of data to the subscriber?

Thanks in advance for all answers and suggestions!

like image 277
foomonkey42 Avatar asked May 20 '13 21:05

foomonkey42


1 Answers

The fundamental problem here is that you want the Rx observable to dispatch events in a way that really breaks the rules of how observables work. I think it would be instructive to look at the Rx design guidelines here: http://go.microsoft.com/fwlink/?LinkID=205219 - most notably, "4.2 Assume observer instances are called in a serialized fashion". i.e. You're not meant to be able to run OnNext calls in parallel. In fact the ordering behaviour of Rx is pretty central to it's design philosophy.

If you look at the source, you'll see that Rx inforces this behaviour in the ScheduledObserver<T> class from which ObserveOnObserver<T> is derived... OnNexts are dispatched from an internal queue and each must complete before the next one is dispatched - within the given execution context. Rx won't allow an individual subscriber's OnNext calls to execute concurrently.

That's not to say you can't have multiple subscibers executing at different rates though. In fact this is easy to see if you change your code as follows:

var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now,
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default);

var subscription1 = source.Subscribe(x =>
    {
        Console.WriteLine("Subscriber 1: {0} Thread: {1} Observed value: {2}",
                            DateTime.Now,
                            Thread.CurrentThread.ManagedThreadId, x);
        Thread.Sleep(1000); // Simulate long work time
    });

var subscription2 = source.Subscribe(x =>
{
    Console.WriteLine("Subscriber 2: {0} Thread: {1} Observed value: {2}",
                        DateTime.Now,
                        Thread.CurrentThread.ManagedThreadId, x);
    Thread.Sleep(5000); // Simulate long work time
});

Now you'll see Subscriber 1 getting ahead of Subscriber 2.

What you can't easily do is ask an observable to do something like dispatch of an OnNext call to a "ready" subscriber - which is kind of what you are asking for in a roundabout way. I also presume you wouldn't really want to create a new thread for every OnNext in a slow consumer situation!

In this scenario it sounds like you might be better off with a single subscriber that does nothing other than push work onto a queue as fast as possible, which is in turn serviced by a number of consuming worker threads you could then control as necessary to keep pace.

like image 139
James World Avatar answered Oct 20 '22 23:10

James World