I am new to the world of reactive programming and I am trying to create a simple backpressure aware message processing using rxjava 2.
Following is the workflow I am trying to achieve:
Flowable of a continues string stream.
Perform a time consuming operation and change the message to another string
Perform another time consuming operation.
Now I am using following code:
{
Flowable.create(subscriber -> {
some_stream.forEach(data -> {
subscriber.onNext(data);
});
}, BackpressureStrategy.BUFFER).
subscribeOn(Schedulers.io()). // Data emission will run io scheduler
observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
map(val -> Time_Consuming_Task(val)). // Task returns another string
observeOn(Schedulers.io()). / Next consumer will run on computation scheduler
subscribe(val -> Another_Time_Consuming_Task(val));
}
Now for small operations I don't see any back pressure related issues.
But for large streams I don't know how it will behave.
Now My questions are:-
What is the default buffer size in case of BackpressureStrategy.BUFFER and where does the data gets buffered ?
What if I want to create two backpressure buffers, each before every time consuming task, should I use onBackpressureBuffer operator ?
If the buffer gets full, I don't want to lose data, I want to wait or something in that case ?
This has been lowered from the previous 1024 (you can see the change being implemented in RxJava here). There is also a system property which you can use to adjust it yourself if needed:
System.setProperty("rx.ring-buffer.size", "8");
As they are called ring buffers, they are stored in-memory. You can read more about them here.
A consequence of the circular buffer is that when it is full and a subsequent write is performed, then it starts overwriting the oldest data.
Quote from the wiki article about Circular buffer.
When you know your rx.ring-buffer.size
, the best thing you can do is to use the following API given in RxJava 2:
onBackpressureBuffer(int capacity, // This is the given bound, not a setter for the ring buffer
Action0 onOverflow, // The desired action to execute
BackpressureOverflow.Strategy strategy) // The desired strategy to use
Again, as I could not say it any better, let me quote the RxJava wiki on Backpressure (2.0):
The BackpressureOverflow.Strategy is an interface actually but the class BackpressureOverflow offers 4 static fields with implementations of it representing typical actions:
ON_OVERFLOW_ERROR
: this is the default behavior of the previous two overloads, signalling a BufferOverflowExceptionON_OVERFLOW_DEFAULT
: currently it is the same as ON_OVERFLOW_ERRORON_OVERFLOW_DROP_LATEST
: if an overflow would happen, the current value will be simply ignored and only the old values will be delivered once the downstream requests.ON_OVERFLOW_DROP_OLDEST
: drops the oldest element in the buffer and adds the current value to it.Note that the last two strategies cause discontinuity in the stream as they drop out elements. In addition, they won't signal BufferOverflowException.
Here's an example:
Flowable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> { },
BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
Worthy of noting:
The
Observable
type in RxJava 2.x has no concept of backpressure. ImplementingObservable
is effectively the same as usingonBackpressureBuffer()
by default. UI events, one-off network requests, and state changes should all work with this approach. TheCompletable
,Maybe
, andSingle
types can also dictate this behavior.If you need to support backpressure, RxJava 2.x’s new class,
Flowable
, is backpressure-aware likeObservable
was in RxJava 1.x. However, the updated library now requires an explicit choice of a backpressure strategy to prevent surpriseMissingBackpressureExceptions
.
Read more:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With