Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava2 Observable backpressure

recently I realized that I don't understand how RxJava2 backpressure works.

I made small test and I expect that it should fail with MissingBackpressureException exception:

@Test
public void testBackpressureWillFail() {
    Observable.<Integer>create(e -> {
        for (int i = 0; i < 10000; i++) {
            System.out.println("Emit: " + i);
            e.onNext(i);
        }
        e.onComplete();
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .doOnNext(i -> {
        Thread.sleep(100);
        System.out.println("Processed:" + i);
    })
    .blockingSubscribe();
}

System out shows next:

Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000

Processed:0
Processed:1
Processed:2
...
Processed:10000

Why it doesn't produce MissingBackpressureException.

I expect that e.onNext(i); will put item into buffer of ObservableObserveOn and after it's size is greater than static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());

It should throw MissingBackpressureException which doesn't happen. Does the buffer automatically grow? If not where are items stored?

like image 511
Rostyslav Roshak Avatar asked Jun 21 '17 10:06

Rostyslav Roshak


People also ask

What is backpressure in flowable?

What is Backpressure? Backpressure is simply the process of handling a fast producer. In the case of an Observable generating 1 million items per second, how can a subscriber that can only process 100 items per second handle these items?

What is difference between flowable and Observable?

Observables are used when we have relatively few items over the time and there is no risk of overflooding consumers. If there is a possibility that the consumer can be overflooded, then we use Flowable. One example could be getting a huge amount of data from a sensor. They typically push out data at a high rate.

What is back pressure in reactive programming?

Back-PressureWhen one component is struggling to keep-up, the system as a whole needs to respond in a sensible way. It is unacceptable for the component under stress to fail catastrophically or to drop messages in an uncontrolled fashion.

What is back pressure handling?

Backpressure in software systems is the capability to overload the traffic communication. In other words, emitters of information overwhelm consumers with data they are not able to process. Eventually, people also apply this term as the mechanism to control and handle it.


1 Answers

That's because backpressure moved out to Flowableonly with RxJava2, see here.
If you will switch to Flowable with BackpressureStrategy.MISSING you will get the exception.
That also means that in your case you indeed have buffer that automatically grows, from observerOn docs:

Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer...

like image 192
yosriz Avatar answered Sep 25 '22 21:09

yosriz