As per documentation I am expecting onErrorContinue
will ignore the error element and continue the sequence. Below test case is failing with exception
java.lang.AssertionError: expectation "expectNext(12)" failed (expected: onNext(12); actual: onError(java.lang.RuntimeException:
@Test
public void testOnErrorContinue() throws InterruptedException {
Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
.concatWith(Flux.error(new RuntimeException("Test")))
.concatWith(Flux.just(6))
.map(i->i*2)
.onErrorContinue((e,i)->{
System.out.println("Error For Item +" + i );
})
;
StepVerifier
.create(fluxFromJust)
.expectNext(2, 4,6,8,10)
.expectNext(12)
.verifyComplete();
}
onErrorContinue()
may not be doing what you think it does - it lets upstream operators recover from errors that may occur within them, if they happen to support doing so. It's a rather specialist operator.
In this case map()
does actually support onErrorContinue
, but map isn't actually producing an error - the error has been inserted into the stream already (by way of concat()
and the explicit Flux.error()
call.) In other words, there's no operator producing the error at all, so there's therefore nothing for it to recover from, as an element supplied is erroneous.
If you changed your stream so that map()
actually caused the error, then it would work as expected:
Flux.just(1, 2,3,4,5)
.map(x -> {
if(x==5) {
throw new RuntimeException();
}
return x*2;
})
.onErrorContinue((e,i)->{
System.out.println("Error For Item +" + i );
})
.subscribe(System.out::println);
Produces:
2
4
6
8
Error For Item +5
An alternative depending on the real-world use case may be to use onErrorResume()
after the element (or element source) that may be erroneous:
Flux.just(1, 2, 3, 4, 5)
.concatWith(Flux.error(new RuntimeException()))
.onErrorResume(e -> {
System.out.println("Error " + e + ", ignoring");
return Mono.empty();
})
.concatWith(Flux.just(6))
.map(i -> i * 2)
.subscribe(System.out::println);
In general, using another "onError" operator (such as onErrorResume()
) is generally the more usual, and more recommended approach, since onErrorContinue()
is dependent on operator support and affects upstream, not downstream operators (which is unusual.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With