Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mono async exception handling

I'm just trying to figure out how exception handling works in reactor library.

Consider the following example:

public class FluxTest {

@Test
public void testIt() throws InterruptedException {
    Scheduler single = Schedulers.single();

    CountDownLatch latch = new CountDownLatch(1);

    Mono.just("hey")
            .subscribeOn(single)
            .doOnError(e -> System.out.println("ERROR1: " + e.getMessage()))
            .doOnTerminate((r, e) -> {
                if (e != null) System.out.println("ERROR3: " + e.getMessage());
                latch.countDown();
            })
            .subscribe(
                    it -> { throw new RuntimeException("Error for " + it); },
                    ex -> { System.out.println("ERROR2: " + ex.getMessage());}
                    );

    latch.await();
}

}

Actually, I can't see any of error handling code block executes. Test terminates without any message. Also, I tried to remove doOnError, doOnTerminate processors with no luck.

like image 439
mgulimonov Avatar asked Jun 08 '17 18:06

mgulimonov


People also ask

What is onErrorMap?

Reactor's onErrorMap operator allows us to convert an exception of one type to another type. Unlike the previous two operators, the onErrorMap operator requires an error callback to be configured with the subscriber.

How does WebClient handle error response?

If you're using WebClient to invoke an HTTP request, you can use the onStatus() method to handle a specific response status code. Yes, even an error code.


1 Answers

And it's correct behavior in your case. You're creating mono from single string "hey" that has no errors. If you try to debug you can see that doOnTerminate method gets called with e = null parameter and according to documentation it gets called in any case success or error.

To test some error handling you can do next thing:

public class FluxTest {
    @Test
    public void testIt() throws InterruptedException {
        Scheduler single = Schedulers.single();

        CountDownLatch latch = new CountDownLatch(1);

        Mono.just("hey")
                .map(this::mapWithException)
                .subscribeOn(single)
                .doOnError(e -> System.out.println("ERROR1: " + e.getMessage()))
                .doOnTerminate((r, e) -> {
                    if (e != null) System.out.println("ERROR3: " + e.getMessage());
                    latch.countDown();
                })
                .subscribe(
                        it -> { throw new RuntimeException("Error for " + it); },
                        ex -> { System.out.println("ERROR2: " + ex.getMessage());}
                        );

        latch.await();
    }

    private String mapWithException(String s) {
        throw new RuntimeException();
    }
}

After running test with code above you should three lines in you console

ERROR1: null
ERROR3: null
ERROR2: null

So first callback is onError when mono failed, second will be onTerminate because mono is terminated with error and the third one errorConsumer from subscribe method.

like image 85
Orest Avatar answered Sep 24 '22 08:09

Orest