Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fetching data based on stream

We have a source that notifies of changes to data, and when an item comes in we asynchronously fetch the new data.

source.SelectMany(async n => { await FetchData()});

While waiting to load the data many notifications may come in, but we want ignore all but 1, so that we don't go fetch data for every single notification, instead go only once more.
How can we ignore all notifications coming in from source, except 1, until the data is fetched?

I have a feeling the solution will involve converting the FetchData() to an IObservable, but I still don't know what Rx primitive would allow us to combine the streams.

like image 577
pauloya Avatar asked Feb 16 '26 03:02

pauloya


1 Answers

Looks like a use case for a pretty classic (but missing) Rx operator: ObserveLatestOn (sample implementation here but you can find others on the web).

source.ObserveLatestOn(TimeSpan.Zero, Schedulers.NewThread).SelectMany(async n => { await FetchData()})

Note that this implementation has only been tested on single threaded schedulers (UI mostly, but will work with NewThread), not with Immediate/CurrentThread (maybe works) or TaskPool (likely has race conditions)

Note also that what you're hitting here is the lack of reactive pull backpressure in Rx.Net (in discussion here), RxJava has a nice backpressure support for such case (e.g. onBackpressureLatest)

like image 98
Gluck Avatar answered Feb 18 '26 15:02

Gluck



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!