Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Avoiding MissingBackpressureException exceptions

Tags:

java

rx-java

Coming from C#, when I used RX and there was backpressure, items would continually be added to internal queues until the application ran out of memory (as far as I can recall).

In ReactiveX (RXJava), it would appear they have taken a different stance by throwing exceptions when backpressure starts to build.

This means that I have to use something like onBackpressureBuffer() and in the call to subscribe() pass in a Subscriber<? super T> which makes requests up the stream to release the pressure.

Maybe its because I am used the RX.NET approach to this, but it seems mental to me.

Firstly, have I understood this correctly?

Secondly, is there anyway I can "disable" this feature, so that it behaves in the same way as RX.NET as I don't want to complicate my subscribe() call by having to check if I have implemented one of these backpressure operators to see whether I have to call request() or not.

like image 732
Cheetah Avatar asked Jan 20 '15 15:01

Cheetah


Video Answer


2 Answers

In scala (I don't know Java syntax, but the method calls will be the same), you just need to turn

fastHotObservable.subscribe(next => slowFunction(next))

into

fastHotObservable.onBackpressureBuffer.subscribe(next => slowFunction(next))

That should do it. Of course, when running it there will have to be some periods of inactivity, so the process has occasionally time to catch up and process the buffered elements.

I don't think it's mental, I find it nice that you can choose the strategy to handle unhandled backpressure yourself instead of being forced into one selected for you. I also prefer having to specify it explicitly.

In fact, the strategy RX.net uses is not always the best. I have been using a few onBackpressureDrop calls lately to just forgot mouse moves I didn't have the time to handle, and I'm glad I can avoid having them buffered so easily.

like image 132
Tomáš Dvořák Avatar answered Oct 06 '22 01:10

Tomáš Dvořák


Backpressure related exception happen if the source Observable doesn't support backpressure, few of the time-related operators have this behavior and many of the Observables implemented for the pre-0.20 version. On the other hand, Subscribers by default run in unbounded mode (they request Long.MAX_VALUE and never bother requesting more). Most operators have fast-path for this case or simply don't interfere with backpressure.

Most common source of the exceptions are the observeOn and merge operators. You'd need to reimplement them. ObserveOn can be switched to an unbounded queue and merge can skip using queues entirely. Here is an example implementation of these two operators.

like image 37
akarnokd Avatar answered Oct 06 '22 01:10

akarnokd