Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Difference between BackpressureStrategy.BUFFER and onBackpressureBuffer operator in rxjava 2

I am new to the world of reactive programming and I am trying to create a simple backpressure aware message processing using rxjava 2.

Following is the workflow I am trying to achieve:

  1. Flowable of a continues string stream.

  2. Perform a time consuming operation and change the message to another string

  3. Perform another time consuming operation.

Now I am using following code:

{
    Flowable.create(subscriber -> {
             some_stream.forEach(data -> {
                subscriber.onNext(data);
            });
        }, BackpressureStrategy.BUFFER).
    subscribeOn(Schedulers.io()). // Data emission will run io scheduler
    observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
    map(val -> Time_Consuming_Task(val)). // Task returns another string
    observeOn(Schedulers.io()).  / Next consumer will run on computation scheduler
    subscribe(val -> Another_Time_Consuming_Task(val));
}

Now for small operations I don't see any back pressure related issues.

But for large streams I don't know how it will behave.

Now My questions are:-

  1. What is the default buffer size in case of BackpressureStrategy.BUFFER and where does the data gets buffered ?

  2. What if I want to create two backpressure buffers, each before every time consuming task, should I use onBackpressureBuffer operator ?

  3. If the buffer gets full, I don't want to lose data, I want to wait or something in that case ?

like image 743
Rahul khandelwal Avatar asked May 12 '17 09:05

Rahul khandelwal


1 Answers

To answer your questions:

1. The default buffer size differs by platform. On the JVM it is 128 items per ring buffer, on Android it is 16 item (Source)

This has been lowered from the previous 1024 (you can see the change being implemented in RxJava here). There is also a system property which you can use to adjust it yourself if needed:

System.setProperty("rx.ring-buffer.size", "8");

As they are called ring buffers, they are stored in-memory. You can read more about them here.

2. & 3. If it gets full, it starts overriding itself. In that case, use onBackpressureBuffer

A consequence of the circular buffer is that when it is full and a subsequent write is performed, then it starts overwriting the oldest data.

Quote from the wiki article about Circular buffer.

When you know your rx.ring-buffer.size, the best thing you can do is to use the following API given in RxJava 2:

onBackpressureBuffer(int capacity, // This is the given bound, not a setter for the ring buffer
    Action0 onOverflow, // The desired action to execute
    BackpressureOverflow.Strategy strategy) // The desired strategy to use

Again, as I could not say it any better, let me quote the RxJava wiki on Backpressure (2.0):

The BackpressureOverflow.Strategy is an interface actually but the class BackpressureOverflow offers 4 static fields with implementations of it representing typical actions:

  • ON_OVERFLOW_ERROR: this is the default behavior of the previous two overloads, signalling a BufferOverflowException
  • ON_OVERFLOW_DEFAULT: currently it is the same as ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST: if an overflow would happen, the current value will be simply ignored and only the old values will be delivered once the downstream requests.
  • ON_OVERFLOW_DROP_OLDEST: drops the oldest element in the buffer and adds the current value to it.

Note that the last two strategies cause discontinuity in the stream as they drop out elements. In addition, they won't signal BufferOverflowException.

Here's an example:

Flowable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Worthy of noting:

The Observable type in RxJava 2.x has no concept of backpressure. Implementing Observable is effectively the same as using onBackpressureBuffer() by default. UI events, one-off network requests, and state changes should all work with this approach. The Completable, Maybe, and Single types can also dictate this behavior.

If you need to support backpressure, RxJava 2.x’s new class, Flowable, is backpressure-aware like Observable was in RxJava 1.x. However, the updated library now requires an explicit choice of a backpressure strategy to prevent surprise MissingBackpressureExceptions.

Read more:

  • src/main/java/rx/internal/util/RxRingBuffer.java#L246
like image 126
anthonymonori Avatar answered Oct 16 '22 04:10

anthonymonori