I need to implement the following algorithm in Rx.NET:
stream
, or wait for a new item without blocking, if there are no new items. Only the latest item matters, others can be dropped.SlowFunction
and print the output.The naive solution is:
let PrintLatestData (stream: IObservable<_>) =
stream.Select(SlowFunction).Subscribe(printfn "%A")
However, this solution does not work because on average stream
emits items faster than SlowFunction
can consume them. Since Select
does not drop items but instead attempts to process every item in order from oldest to newest, the delay between an item being emitted and printed will grow towards infinity as the program run. Only the latest recent item should be taken from the stream to avoid this infinitely growing backpressure.
I searched the documentation and found a method called onBackpressureLatest
in RxJava, which to my understanding would do what I described above. However, the method does not exist in Rx.NET. How to implement this in Rx.NET?
I think you want to use something like ObserveLatestOn
. It effectively replaces the queue of incoming events with a single value and a flag.
James World has blogged about it here http://www.zerobugbuild.com/?p=192
The concept is used heavily in GUI applications that cant trust how fast the server may push data at it.
You can also see an implementation in Reactive Trader https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64 and the supporting presentations explaining ReactiveTrader https://leecampbell.com/presentations/#ReactConfLondon2014
To be clear this is a load-shedding algorithm, not a backpressure algorithm.
The sync/async suggestion may help slightly, but, given slow function is always slower than the stream of events, making it async might allow you to parallelise the handling (with observe on a thread pool) at the cost of (eventually) just running out of threads or adding more latency with context switching. It doesn't sound like a solution to me.
I suggest you look at the open source Rxx 'Introspective' operators written by Dave Sexton. These can vary the buffer/throttle period which you get latest from, as the queue backs up due to a slow consumer. If slow function suddenly gets faster, it won't buffer things at all. If it gets slower, it will buffer it more. You'll have to check if there is a 'latest from' type, or just modify existing to suit your needs. E.g. Use buffer and just take last item in buffer, or enhance further to internally store only the latest. Google 'Rxx', and you'll find it on Github somewhere.
A simpler approach, if the time of 'slow function' is fairly predictable, is to simply throttle your stream by an amount which exceeds this time. Obviously I don't mean the standard rx 'throttle', but one which lets a more recent update through instead of an old one. There are plenty of solutions to this sort of problem readily available on here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With