In my Production code, I am getting errors in my logs when a Mono times out.
I have managed to recreate these errors with the following code:
@Test
public void testScheduler() {
Mono<String> callableMethod1 = callableMethod();
callableMethod1.block();
Mono<String> callableMethod2 = callableMethod();
callableMethod2.block();
}
private Mono<String> callableMethod() {
return Mono.fromCallable(() -> {
Thread.sleep(60);
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"));
}
In the Mono.fromCallable
I am making a blocking call using a third-party library. When this call times out, I get errors similar to
reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.publisher.Operators - Scheduler worker in group main failed with an uncaught exception
These errors also seem to be intermittent, sometimes when I run the code provided I get no errors at all. However when I repeat the call in a loop of say 10, I consistently get them.
Question: Why does this error happen?
Answer:
When the duration given to the timeout() operator has passed, it throws a TimeoutException. That results in the following outcomes:
An onError signal is sent to the main reactive chain. As a result, the main execution is resumed and the process moves on (i.e., onErrorResume() is executed).
Shortly after outcome #1, the async task defined within fromCallable() is interrupted, which triggers a 2nd exception (InterruptedException). The main reactive chain can no longer handle this InterruptedException because the TimeoutException happened first and already caused the main reactive chain to resume (Note: this behavior of not generating a 2nd onError signal conforms with the Reactive Stream Specification -> Publisher #7).
Since the 2nd exception (InterruptedException) can't be handled gracefully by the main chain, Reactor logs it at error level to let us know an unexpected exception occurred.
Question: How do I get rid of them?
Short Answer: Use Hooks.onErrorDropped() to change the log level:
Logger logger = Logger.getLogger(this.getClass().getName());
@Test
public void test() {
Hooks.onErrorDropped(error -> {
logger.log(Level.WARNING, "Exception happened:", error);
});
Mono.fromCallable(() -> {
Thread.sleep(60);
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"))
.doOnSuccess(result -> logger.info("Result: " + result))
.block();
}
Long Answer: If your use-case allows, you could handle the exception happening within fromCallable() so that the only exception affecting the main chain is the TimeoutException. In that case, the onErrorDropped() wouldn't happen in the first place.
@Test
public void test() {
Mono.fromCallable(() -> {
try {
Thread.sleep(60);
} catch (InterruptedException ex) {
//release resources, rollback actions, etc
logger.log(Level.WARNING, "Something went wrong...", ex);
}
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"))
.doOnSuccess(result -> logger.info("Result: " + result))
.block();
}
Extra References:
https://tacogrammer.com/onerrordropped-explained/
https://medium.com/@kalpads/configuring-timeouts-in-spring-reactive-webclient-4bc5faf56411
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With