Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions (RX) Custom Scheduler C#

Is is possible to build a custom scheduler that can inspect the value of every element passing through the IObservable, in order to decide on which thread to process the item?

I have a requirement to process items with the same key sequentially but different keys in parallel. It would make sense to have RX do the scheduling rather than have to leave the observable earlier than I'd like in order to allocate each value to a thread.

like image 283
benthemos Avatar asked Aug 28 '15 10:08

benthemos


1 Answers

Have you tried GroupBy followed by ObserveOn?

Something like:

source
    .GroupBy(item => item.Key)
    .SelectMany(group => group
        .ObserveOn(Scheduler.NewThread)
        .Select(item => process(item))
    )
    .Subscribe(processResult => ...);

This will partition the stream by key, start a new thread for each key, and run process() for each item in that key.

like image 191
Brandon Avatar answered Sep 28 '22 23:09

Brandon