Coming from C#, when I used RX and there was backpressure, items would continually be added to internal queues until the application ran out of memory (as far as I can recall).
In ReactiveX (RXJava), it would appear they have taken a different stance by throwing exceptions when backpressure starts to build.
This means that I have to use something like onBackpressureBuffer()
and in the call to subscribe()
pass in a Subscriber<? super T>
which makes requests up the stream to release the pressure.
Maybe its because I am used the RX.NET approach to this, but it seems mental to me.
Firstly, have I understood this correctly?
Secondly, is there anyway I can "disable" this feature, so that it behaves in the same way as RX.NET as I don't want to complicate my subscribe()
call by having to check if I have implemented one of these backpressure operators to see whether I have to call request()
or not.
In scala (I don't know Java syntax, but the method calls will be the same), you just need to turn
fastHotObservable.subscribe(next => slowFunction(next))
into
fastHotObservable.onBackpressureBuffer.subscribe(next => slowFunction(next))
That should do it. Of course, when running it there will have to be some periods of inactivity, so the process has occasionally time to catch up and process the buffered elements.
I don't think it's mental, I find it nice that you can choose the strategy to handle unhandled backpressure yourself instead of being forced into one selected for you. I also prefer having to specify it explicitly.
In fact, the strategy RX.net uses is not always the best. I have been using a few onBackpressureDrop
calls lately to just forgot mouse moves I didn't have the time to handle, and I'm glad I can avoid having them buffered so easily.
Backpressure related exception happen if the source Observable doesn't support backpressure, few of the time-related operators have this behavior and many of the Observables implemented for the pre-0.20 version. On the other hand, Subscribers by default run in unbounded mode (they request Long.MAX_VALUE
and never bother requesting more). Most operators have fast-path for this case or simply don't interfere with backpressure.
Most common source of the exceptions are the observeOn
and merge
operators. You'd need to reimplement them. ObserveOn can be switched to an unbounded queue and merge can skip using queues entirely. Here is an example implementation of these two operators.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With