Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Project reactor: onErrorResume after flatMap

Flux.just("a", "b")
        .flatMap(s -> s.equals("a") ? Mono.error(new RuntimeException() : Flux.just(s + "1", s + "2"))
        .onErrorResume(throwable -> Mono.empty())
        .subscribe(System.out::println);

Hello!

Here I made a flux of two elements and then expose by flatMap first one to exception, and second one to another Flux.

With onErrorResume I expect the output

b1
b2

but get nothing. Could anyone explain why does it happens, please?

Thanks.

like image 390
G.Domozhirov Avatar asked Mar 22 '18 14:03

G.Domozhirov


People also ask

What are the map and flatMap operators in project reactor?

This tutorial introduces the map and flatMap operators in Project Reactor. They're defined in the Mono and Flux classes to transform items when processing a stream. In the following sections, we'll focus on the map and flatMap methods in the Flux class. Those of the same name in the Mono class work just the same way. 2. Maven Dependencies

How to handle error in reactor?

For example, there is a method that always throws error as shown below There are four useful operators in Reactor for handling error: doOnError, onErrorMap, onErrorReturn, and onErrorResume. All of them have different usage, but they have a similarity.

What is the difference between map and handle in reactor?

Similar to the map operator, we can use the handle operator to process items in a stream one by one. The difference is that Reactor provides the handle operator with an output sink, allowing us to apply more complicated transformations. Let's update our example from the previous section to use the handle operator:

How to handle null values in a stream in reactor?

As required by the specification, Reactor throws a NullPointerException when a null value reaches the map function. Therefore, there's nothing we can do about a null value when it reaches a certain stream. We can't handle it or convert it to a non- null value before passing it downstream.


1 Answers

Given this:

Flux.just("a", "b", "c")
        .flatMap { s ->
            if (s == "b") 
                Mono.error<RuntimeException>(RuntimeException()) 
            else 
                Flux.just(s + "1", s + "2")
        }.onErrorResume { throwable -> Mono.just("d") }.log()
        .subscribe { println(it) }

The output is:

12:35:19.673 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
12:35:19.676 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
12:35:19.677 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a1)
a1
12:35:19.677 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a2)
a2
12:35:19.712 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(d)
d
12:35:19.713 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()

What's going on here? onErrorResume() is being applied to the Publisher returned by the flatMap() operator. Since on "b" the Publisher signals a failure, the flatMap() Publisher doesn't execute anymore and onErrorResume() operator keeps publishing using its fallback.

The documentation for onErrorResume() shows clearly that the original Publisher finishes because of the error and the fallback takes over:

enter image description here

like image 171
codependent Avatar answered Sep 20 '22 22:09

codependent