Is is possible to build a custom scheduler that can inspect the value of every element passing through the IObservable, in order to decide on which thread to process the item?
I have a requirement to process items with the same key sequentially but different keys in parallel. It would make sense to have RX do the scheduling rather than have to leave the observable earlier than I'd like in order to allocate each value to a thread.
Have you tried GroupBy
followed by ObserveOn
?
Something like:
source
.GroupBy(item => item.Key)
.SelectMany(group => group
.ObserveOn(Scheduler.NewThread)
.Select(item => process(item))
)
.Subscribe(processResult => ...);
This will partition the stream by key, start a new thread for each key, and run process()
for each item in that key.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With