I have an array of multiple URLs and ports. For each of them, I need to send and receive something back:
Flux.fromArray(trackersArray)
.flatMap(tracker ->
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort()))
I communicate with the servers in UDP
so I can't tell if a server is alive or not unless I send a message which 'by some set of rules, need to respond to it.
ConnectToTracker.connect
may send a onNext
signal if the server response or onError
signal if, for example, the server doesn't response (SocketTimeOutException
) or any other failure (general IOException
).
I don't want to terminate the flux
if, for example, the onError
signal is equal to SocketTimeOutException
. Instead, I would like to try communicating with every tracker I got.
This link contains all the operations I can use to handle errors but not ignore them.
I'm using Reactor 3
if this matters.
Update:
I made an ugly trick, but works:
Flux.fromArray(trackersArray)
.handle((Tracker tracker, SynchronousSink<ConnectResponse> sink) -> {
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
.subscribe(sink::next, error -> {
if (!(error instanceof SocketTimeoutException))
sink.error(error);
}, sink::complete);
})
Please fill free to answer if you have anything better.
since you are already processing urls in a flatmap, use onErrorResume(e -> Mono.empty())
. this will let flatmap ignore the error.
edit: within the flatmap, on the right hand side of the lambda
Flux.fromArray(trackersArray)
.flatMap(tracker ->
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
.onErrorResume(SocketTimeoutException.class, __ -> Mono.empty()))
Maybe this is better of doing the same it will on recover from SocketTimeOut and if the exception is other i will go for the onError
Now we have reactor.core.publisher.onErrorContinue()
in version 3.3.2
, which allows you to send onNext()
signal when some elements is onError()
. Use log()
you will see better.
The signature is (throwable, instance)
so if you want to log the errored out one, is useful.
Flux.fromIterable(aList)
.flatMap(this::xxxx)
.onErrorContinue((throwable, o) -> {
log.error("Error while processing {}. Cause: {}", o, throwable.getMessage());
})
....
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With