Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Reactor onErrorContinue not working

As per documentation I am expecting onErrorContinue will ignore the error element and continue the sequence. Below test case is failing with exception

java.lang.AssertionError: expectation "expectNext(12)" failed (expected: onNext(12); actual: onError(java.lang.RuntimeException:

@Test
public void testOnErrorContinue() throws InterruptedException {
    Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
            .concatWith(Flux.error(new RuntimeException("Test")))
            .concatWith(Flux.just(6))
            .map(i->i*2)
            .onErrorContinue((e,i)->{
                System.out.println("Error For Item +" + i );
            })
            ;
    StepVerifier
            .create(fluxFromJust)
            .expectNext(2, 4,6,8,10)
            .expectNext(12)
            .verifyComplete();
}
like image 559
Niraj Sonawane Avatar asked Dec 07 '22 11:12

Niraj Sonawane


1 Answers

onErrorContinue() may not be doing what you think it does - it lets upstream operators recover from errors that may occur within them, if they happen to support doing so. It's a rather specialist operator.

In this case map() does actually support onErrorContinue, but map isn't actually producing an error - the error has been inserted into the stream already (by way of concat() and the explicit Flux.error() call.) In other words, there's no operator producing the error at all, so there's therefore nothing for it to recover from, as an element supplied is erroneous.

If you changed your stream so that map() actually caused the error, then it would work as expected:

Flux.just(1, 2,3,4,5)
        .map(x -> {
            if(x==5) {
                throw new RuntimeException();
            }
            return x*2;
        })
        .onErrorContinue((e,i)->{
            System.out.println("Error For Item +" + i );
        })
        .subscribe(System.out::println);

Produces:

2
4
6
8
Error For Item +5

An alternative depending on the real-world use case may be to use onErrorResume() after the element (or element source) that may be erroneous:

Flux.just(1, 2, 3, 4, 5)
        .concatWith(Flux.error(new RuntimeException()))
        .onErrorResume(e -> {
            System.out.println("Error " + e + ", ignoring");
            return Mono.empty();
        })
        .concatWith(Flux.just(6))
        .map(i -> i * 2)
        .subscribe(System.out::println);

In general, using another "onError" operator (such as onErrorResume()) is generally the more usual, and more recommended approach, since onErrorContinue() is dependent on operator support and affects upstream, not downstream operators (which is unusual.)

like image 191
Michael Berry Avatar answered Dec 26 '22 18:12

Michael Berry