Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling backpressure in Rx.NET without onBackpressureLatest

I need to implement the following algorithm in Rx.NET:

  1. Take latest item from stream, or wait for a new item without blocking, if there are no new items. Only the latest item matters, others can be dropped.
  2. Input the item to SlowFunction and print the output.
  3. Repeat from step 1.

The naive solution is:

let PrintLatestData (stream: IObservable<_>) =
    stream.Select(SlowFunction).Subscribe(printfn "%A")

However, this solution does not work because on average stream emits items faster than SlowFunction can consume them. Since Select does not drop items but instead attempts to process every item in order from oldest to newest, the delay between an item being emitted and printed will grow towards infinity as the program run. Only the latest recent item should be taken from the stream to avoid this infinitely growing backpressure.

I searched the documentation and found a method called onBackpressureLatest in RxJava, which to my understanding would do what I described above. However, the method does not exist in Rx.NET. How to implement this in Rx.NET?

like image 946
Steve Avatar asked Feb 14 '17 21:02

Steve


2 Answers

I think you want to use something like ObserveLatestOn. It effectively replaces the queue of incoming events with a single value and a flag.

James World has blogged about it here http://www.zerobugbuild.com/?p=192

The concept is used heavily in GUI applications that cant trust how fast the server may push data at it.

You can also see an implementation in Reactive Trader https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64 and the supporting presentations explaining ReactiveTrader https://leecampbell.com/presentations/#ReactConfLondon2014

To be clear this is a load-shedding algorithm, not a backpressure algorithm.

like image 199
Lee Campbell Avatar answered Sep 21 '22 04:09

Lee Campbell


The sync/async suggestion may help slightly, but, given slow function is always slower than the stream of events, making it async might allow you to parallelise the handling (with observe on a thread pool) at the cost of (eventually) just running out of threads or adding more latency with context switching. It doesn't sound like a solution to me.

I suggest you look at the open source Rxx 'Introspective' operators written by Dave Sexton. These can vary the buffer/throttle period which you get latest from, as the queue backs up due to a slow consumer. If slow function suddenly gets faster, it won't buffer things at all. If it gets slower, it will buffer it more. You'll have to check if there is a 'latest from' type, or just modify existing to suit your needs. E.g. Use buffer and just take last item in buffer, or enhance further to internally store only the latest. Google 'Rxx', and you'll find it on Github somewhere.

A simpler approach, if the time of 'slow function' is fairly predictable, is to simply throttle your stream by an amount which exceeds this time. Obviously I don't mean the standard rx 'throttle', but one which lets a more recent update through instead of an old one. There are plenty of solutions to this sort of problem readily available on here.

like image 22
H_Andr Avatar answered Sep 24 '22 04:09

H_Andr