Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx - consume each item on new thread [duplicate]

Let's assume I have such code:

static void Main(string[] args)
    {
        var scheduler = NewThreadScheduler.Default;
        var enumerable = Enumerable.Range(0, 100);

        enumerable
            .ToObservable(scheduler)
            .SubscribeOn(scheduler)
            .Subscribe(item =>
            {
                Console.WriteLine("Consuming {0} on Thread: {1}", item, Thread.CurrentThread.ManagedThreadId);

                // simulate long running operation
                Thread.Sleep(1000);
            });

        Console.ReadKey();
    }

As you I convert IEnumerable to IObservable. Then I want to consume each item on new thread so I used SubsribeOn(scheduler). Unfortunately each iteration works on same thread so one iteration blocks next.

Result is:

Consuming 0 on Thread: 4
Consuming 1 on Thread: 4
Consuming 2 on Thread: 4
Consuming 3 on Thread: 4
Consuming 4 on Thread: 4
....

Is is possible to force such behavior?

like image 884
Lukas Avatar asked Sep 05 '17 12:09

Lukas


1 Answers

The behaviour you are seeing is completely by design.

Fundamental to Rx is it's grammar which declares that a stream is defined as a sequence of zero or more OnNext calls followed by an optional OnError or OnCompleted.

In particular, Rx grammar dictates that each of these messages is delivered sequentially for a given subscriber.

So what you are seeing is the correct behaviour - no concurrent execution of OnNext handlers. Given this deliberate constraint, creating a new thread for each OnNext would be quite wasteful.

Under the covers, if you trace the code through far enough, you'll see that the NewThreadScheduler utilizes an EventLoopScheduler specifically to re-use the thread for each subscriber. The moniker NewThreadScheduler really speaks to the fact the each subscriber gets a new thread, not each event.

To see this, modify your code so that we have two subscribers running at different speeds. You'll see each gets its own thread and proceeds at it's own pace and the faster is unimpeded by the slower:

var scheduler = NewThreadScheduler.Default;
var enumerable = Enumerable.Range(0, 100);

var xs = enumerable
    .ToObservable(scheduler)
    .SubscribeOn(scheduler);

xs.Subscribe(item =>
{
    Console.WriteLine("Slow consuming {0} on Thread: {1}",
        item, Thread.CurrentThread.ManagedThreadId);

    // simulate slower long running operation
    Thread.Sleep(1000);
});

xs.Subscribe(item =>
{
    Console.WriteLine("Fast consuming {0} on Thread: {1}",
        item, Thread.CurrentThread.ManagedThreadId);

    // simulate faster long running operation
    Thread.Sleep(500);
});

Console.ReadKey();

You may find a read through of the Rx Design Guidelines is quite helpful.

The desire to allow for concurrent processing of events in a subscriber suggests a queue with multiple consumers may be what you are after - and for that you could look outside of Rx, for example a BCL ConcurrentQueue<T>. It is also possible to project messages into asynchronous calls and gather results on completion without violating Rx grammar constraints.

e.g. Here's some similar code that randomly processes each number in the stream for a different length of time. You can see the results come in out of order, and unimpeded by each other. It's not awesome code, but it makes the point. It could be genuinely useful if the async work was something IO bound. Also note the use of Observable.Range which avoids the use of the Enumerable.Range().ToObservable() combo. Tested on .NET Core 2.0:

var random = new Random();

// stop the threadpool from throttling us as it grows
ThreadPool.SetMinThreads(100, 1);

Observable.Range(0, 100)
.SelectMany(x => Observable.Start(() =>
{
    Console.WriteLine($"Started {x}");
    Thread.Sleep(random.Next(1, 10) * 1000);
    return x;
}))
.Subscribe(item =>
{
    Console.WriteLine($"{item}, {Thread.CurrentThread.ManagedThreadId}");
});

Console.ReadKey();
like image 168
James World Avatar answered Nov 13 '22 10:11

James World