Let's assume I have such code:
static void Main(string[] args)
{
var scheduler = NewThreadScheduler.Default;
var enumerable = Enumerable.Range(0, 100);
enumerable
.ToObservable(scheduler)
.SubscribeOn(scheduler)
.Subscribe(item =>
{
Console.WriteLine("Consuming {0} on Thread: {1}", item, Thread.CurrentThread.ManagedThreadId);
// simulate long running operation
Thread.Sleep(1000);
});
Console.ReadKey();
}
As you I convert IEnumerable to IObservable. Then I want to consume each item on new thread so I used SubsribeOn(scheduler). Unfortunately each iteration works on same thread so one iteration blocks next.
Result is:
Consuming 0 on Thread: 4
Consuming 1 on Thread: 4
Consuming 2 on Thread: 4
Consuming 3 on Thread: 4
Consuming 4 on Thread: 4
....
Is is possible to force such behavior?
The behaviour you are seeing is completely by design.
Fundamental to Rx is it's grammar which declares that a stream is defined as a sequence of zero or more OnNext
calls followed by an optional OnError
or OnCompleted
.
In particular, Rx grammar dictates that each of these messages is delivered sequentially for a given subscriber.
So what you are seeing is the correct behaviour - no concurrent execution of OnNext
handlers. Given this deliberate constraint, creating a new thread for each OnNext
would be quite wasteful.
Under the covers, if you trace the code through far enough, you'll see that the NewThreadScheduler
utilizes an EventLoopScheduler
specifically to re-use the thread for each subscriber. The moniker NewThreadScheduler
really speaks to the fact the each subscriber gets a new thread, not each event.
To see this, modify your code so that we have two subscribers running at different speeds. You'll see each gets its own thread and proceeds at it's own pace and the faster is unimpeded by the slower:
var scheduler = NewThreadScheduler.Default;
var enumerable = Enumerable.Range(0, 100);
var xs = enumerable
.ToObservable(scheduler)
.SubscribeOn(scheduler);
xs.Subscribe(item =>
{
Console.WriteLine("Slow consuming {0} on Thread: {1}",
item, Thread.CurrentThread.ManagedThreadId);
// simulate slower long running operation
Thread.Sleep(1000);
});
xs.Subscribe(item =>
{
Console.WriteLine("Fast consuming {0} on Thread: {1}",
item, Thread.CurrentThread.ManagedThreadId);
// simulate faster long running operation
Thread.Sleep(500);
});
Console.ReadKey();
You may find a read through of the Rx Design Guidelines is quite helpful.
The desire to allow for concurrent processing of events in a subscriber suggests a queue with multiple consumers may be what you are after - and for that you could look outside of Rx, for example a BCL ConcurrentQueue<T>
. It is also possible to project messages into asynchronous calls and gather results on completion without violating Rx grammar constraints.
e.g. Here's some similar code that randomly processes each number in the stream for a different length of time. You can see the results come in out of order, and unimpeded by each other. It's not awesome code, but it makes the point. It could be genuinely useful if the async work was something IO bound. Also note the use of Observable.Range
which avoids the use of the Enumerable.Range().ToObservable()
combo. Tested on .NET Core 2.0:
var random = new Random();
// stop the threadpool from throttling us as it grows
ThreadPool.SetMinThreads(100, 1);
Observable.Range(0, 100)
.SelectMany(x => Observable.Start(() =>
{
Console.WriteLine($"Started {x}");
Thread.Sleep(random.Next(1, 10) * 1000);
return x;
}))
.Subscribe(item =>
{
Console.WriteLine($"{item}, {Thread.CurrentThread.ManagedThreadId}");
});
Console.ReadKey();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With