I've been working in Spring Reactor and had some previous testing that made me wonder how Fluxes handle backpressure by default. I know that onBackpressureBuffer and such exist, and I have also read that RxJava defaults to unbounded until you define whether to buffer, drop, etc.
So, can anyone clarify for me: What is the default backpressure behavior for a Flux in Reactor 3?
I tried searching for the answer but didn't find any clear answers, only definitions of Backpressure or that answer linked above for RxJava
Backpressure in WebFlux Then, in order to get the meaning of the backpressure control term, we have to recap what backpressure means from the Reactive Streams specification perspective. The basic semantics define how the transmission of stream elements is regulated through back-pressure.
Aside from scaling up your available compute resources, how you handle backpressure can pretty much be summed up with three possible options: Control the producer (slow down/speed up is decided by consumer) Buffer (accumulate incoming data spikes temporarily) Drop (sample a percentage of the incoming data)
Project Reactor is a direct implementation of the Reactive Streams Specification. The main feature of Reactive Streams Specification is that it provides a medium of communication between the stream producer and stream consumer so that a consumer can demand the stream according to its processing capabilities.
What is Backpressure? Backpressure is simply the process of handling a fast producer. In the case of an Observable generating 1 million items per second, how can a subscriber that can only process 100 items per second handle these items?
Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high - Reactor Reference
When we are talking about backpressure we have to separate sources/publishers into two groups: the ones that respect the demand from the subscriber, and those that ignore it.
Generally hot sources do not respect subscriber demand, since they often produce live data, like listening into a Twitter feed. In this example the subscriber doesn't have control over at what rate tweets are created, so it could easily get overwhelmed.
On the other hand a cold source usually generates data on demand when subscription happens, like making an HTTP request and then processing the response. In this case the HTTP server you are calling will only send a response after you sent your request.
Important to note that this is not a rule: not every hot source ignores the demand and not every cold source respects it. You can read more on hot and cold sources here.
Let's look at some examples that might help in understanding.
Given a Flux that produces numbers from 1 to Integer.MAX_VALUE
and given a processing step that takes 100ms to process a single element:
Flux.range(1, Integer.MAX_VALUE)
.log()
.concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1) // simulate that processing takes time
.blockLast();
Let's see the logs:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (parallel-1) | request(1)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-2) | request(1)
[ INFO] (parallel-2) | onNext(4)
[ INFO] (parallel-3) | request(1)
[ INFO] (parallel-3) | onNext(5)
We can see that before every onNext there is a request. The request signal is sent by concatMap
operator. It is signaled when concatMap
finished the current element and ready to accept the next one. The source only sends the next item when it receives a request from the downstream.
In this example backpressure is automatic, we don't need to define any strategy because the operator knows what it can handle and the source respects it.
For the sake of simplicity I selected an easy to understand cold publisher for this example. It's Flux.interval which emits one item per the specified time interval. It makes sense that this cold publisher does not respect demand since it would be quite strange to see items emitted by different, longer intervals than the one originally specified.
Let's see the code:
Flux.interval(Duration.ofMillis(1))
.log()
.concatMap(x -> Mono.delay(Duration.ofMillis(100)))
.blockLast();
Source emits one item every millisecond. Subscriber is able to process one item every 100 milliseconds. It's clear that the subscriber is not able to keep up with the producer and we get an exception something like this quite soon:
reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
...
What can we do to avoid this exception?
The default backpressure strategy is the one we have seen above: terminating with error. Reactor does not enforce any error handling strategy on us. When we see this kind of error we can decide which one is the most applicable for our use case.
You can find a couple of them in Reactor reference.
For this example we will use the simplest one: onBackpressureDrop
.
Flux.interval(Duration.ofMillis(1))
.onBackpressureDrop()
.concatMap(a -> Mono.delay(Duration.ofMillis(100)).thenReturn(a))
.doOnNext(a -> System.out.println("Element kept by consumer: " + a))
.blockLast();
Output:
Element kept by consumer: 0
Element kept by consumer: 1
Element kept by consumer: 2
Element kept by consumer: 3
Element kept by consumer: 4
Element kept by consumer: 5
Element kept by consumer: 6
Element kept by consumer: 7
Element kept by consumer: 8
Element kept by consumer: 9
Element kept by consumer: 10
Element kept by consumer: 11
Element kept by consumer: 12
Element kept by consumer: 13
Element kept by consumer: 14
Element kept by consumer: 15
Element kept by consumer: 16
Element kept by consumer: 17
Element kept by consumer: 18
Element kept by consumer: 19
Element kept by consumer: 20
Element kept by consumer: 21
Element kept by consumer: 22
Element kept by consumer: 23
Element kept by consumer: 24
Element kept by consumer: 25
Element kept by consumer: 26
Element kept by consumer: 27
Element kept by consumer: 28
Element kept by consumer: 29
Element kept by consumer: 30
Element kept by consumer: 31
Element kept by consumer: 2399
Element kept by consumer: 2400
Element kept by consumer: 2401
Element kept by consumer: 2402
Element kept by consumer: 2403
Element kept by consumer: 2404
Element kept by consumer: 2405
Element kept by consumer: 2406
Element kept by consumer: 2407
We can see that after the first 32 items there is a quite big skip to 2400. The elements between are dropped due to the defined strategy.
UPDATE: Useful read: How to control request rate
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With