I have the following observable
IObservable<Work> observable = SomeMethodToGetWorkToDo();
Every time OnNext
is called on the above, I need the work done in a separate thread. Each work is going to take long to finish so I can't have other Work
items in the queue waiting so long there are enough system resources.
I thought ObserveOn
would solve my problem, but when I run a few Console.WriteLine
calls to see the thread IDs, I saw the same thread ID for each notification call.
How can I make sure of parallelism in OnNext
?
You need to transform the "work to do" into a series of "work is being done" tokens that you then collect. Easiest way is to use TPL to execute the work as a Task (which will end up running the Tasks on the Thread Pool). Something like this:
observable
.SelectMany(work => Task.Run(() => DoWork(work))
.Subscribe(workResult => Console.WriteLine("a work item was completed"));
ObserveOn
works best as follows:
var sample = Observable.Interval(TimeSpan.FromMilliseconds(100));
sample
.SelectMany(l => Observable.Return(l)
.ObserveOn(TaskPoolScheduler.Default /*or NewThreadScheduler.Default */)
)
.Select(l => Thread.CurrentThread.ManagedThreadId)
.Take(9)
.Subscribe(i => Console.WriteLine(i));
The trick is to get the different items to be processed on different threads, they have to be on different streams.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With