Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Notification parallelism in Observable

I have the following observable

IObservable<Work> observable = SomeMethodToGetWorkToDo();

Every time OnNext is called on the above, I need the work done in a separate thread. Each work is going to take long to finish so I can't have other Work items in the queue waiting so long there are enough system resources.

I thought ObserveOn would solve my problem, but when I run a few Console.WriteLine calls to see the thread IDs, I saw the same thread ID for each notification call.

How can I make sure of parallelism in OnNext?

like image 712
fahadash Avatar asked Feb 04 '23 13:02

fahadash


2 Answers

You need to transform the "work to do" into a series of "work is being done" tokens that you then collect. Easiest way is to use TPL to execute the work as a Task (which will end up running the Tasks on the Thread Pool). Something like this:

observable
    .SelectMany(work => Task.Run(() => DoWork(work))
    .Subscribe(workResult => Console.WriteLine("a work item was completed"));
like image 93
Brandon Avatar answered Feb 07 '23 03:02

Brandon


ObserveOn works best as follows:

var sample = Observable.Interval(TimeSpan.FromMilliseconds(100));

sample
    .SelectMany(l => Observable.Return(l)
        .ObserveOn(TaskPoolScheduler.Default /*or NewThreadScheduler.Default */)
    )
    .Select(l => Thread.CurrentThread.ManagedThreadId)
    .Take(9)
    .Subscribe(i => Console.WriteLine(i));

The trick is to get the different items to be processed on different threads, they have to be on different streams.

like image 23
Shlomo Avatar answered Feb 07 '23 05:02

Shlomo