Flux.just("a", "b")
.flatMap(s -> s.equals("a") ? Mono.error(new RuntimeException() : Flux.just(s + "1", s + "2"))
.onErrorResume(throwable -> Mono.empty())
.subscribe(System.out::println);
Hello!
Here I made a flux of two elements and then expose by flatMap first one to exception, and second one to another Flux.
With onErrorResume
I expect the output
b1
b2
but get nothing. Could anyone explain why does it happens, please?
Thanks.
This tutorial introduces the map and flatMap operators in Project Reactor. They're defined in the Mono and Flux classes to transform items when processing a stream. In the following sections, we'll focus on the map and flatMap methods in the Flux class. Those of the same name in the Mono class work just the same way. 2. Maven Dependencies
For example, there is a method that always throws error as shown below There are four useful operators in Reactor for handling error: doOnError, onErrorMap, onErrorReturn, and onErrorResume. All of them have different usage, but they have a similarity.
Similar to the map operator, we can use the handle operator to process items in a stream one by one. The difference is that Reactor provides the handle operator with an output sink, allowing us to apply more complicated transformations. Let's update our example from the previous section to use the handle operator:
As required by the specification, Reactor throws a NullPointerException when a null value reaches the map function. Therefore, there's nothing we can do about a null value when it reaches a certain stream. We can't handle it or convert it to a non- null value before passing it downstream.
Given this:
Flux.just("a", "b", "c")
.flatMap { s ->
if (s == "b")
Mono.error<RuntimeException>(RuntimeException())
else
Flux.just(s + "1", s + "2")
}.onErrorResume { throwable -> Mono.just("d") }.log()
.subscribe { println(it) }
The output is:
12:35:19.673 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
12:35:19.676 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
12:35:19.677 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a1)
a1
12:35:19.677 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a2)
a2
12:35:19.712 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(d)
d
12:35:19.713 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()
What's going on here? onErrorResume()
is being applied to the Publisher returned by the flatMap()
operator. Since on "b" the Publisher signals a failure, the flatMap()
Publisher doesn't execute anymore and onErrorResume()
operator keeps publishing using its fallback.
The documentation for onErrorResume() shows clearly that the original Publisher finishes because of the error and the fallback takes over:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With