recently I realized that I don't understand how RxJava2
backpressure works.
I made small test and I expect that it should fail with MissingBackpressureException
exception:
@Test
public void testBackpressureWillFail() {
Observable.<Integer>create(e -> {
for (int i = 0; i < 10000; i++) {
System.out.println("Emit: " + i);
e.onNext(i);
}
e.onComplete();
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.doOnNext(i -> {
Thread.sleep(100);
System.out.println("Processed:" + i);
})
.blockingSubscribe();
}
System out shows next:
Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000
Processed:0
Processed:1
Processed:2
...
Processed:10000
Why it doesn't produce MissingBackpressureException
.
I expect that e.onNext(i);
will put item into buffer of ObservableObserveOn
and after it's size is greater than static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());
It should throw MissingBackpressureException
which doesn't happen. Does the buffer automatically grow? If not where are items stored?
What is Backpressure? Backpressure is simply the process of handling a fast producer. In the case of an Observable generating 1 million items per second, how can a subscriber that can only process 100 items per second handle these items?
Observables are used when we have relatively few items over the time and there is no risk of overflooding consumers. If there is a possibility that the consumer can be overflooded, then we use Flowable. One example could be getting a huge amount of data from a sensor. They typically push out data at a high rate.
Back-PressureWhen one component is struggling to keep-up, the system as a whole needs to respond in a sensible way. It is unacceptable for the component under stress to fail catastrophically or to drop messages in an uncontrolled fashion.
Backpressure in software systems is the capability to overload the traffic communication. In other words, emitters of information overwhelm consumers with data they are not able to process. Eventually, people also apply this term as the mechanism to control and handle it.
That's because backpressure moved out to Flowable
only with RxJava2, see here.
If you will switch to Flowable
with BackpressureStrategy.MISSING
you will get the exception.
That also means that in your case you indeed have buffer that automatically grows,
from observerOn
docs:
Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With