Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way in Reactor to ignore error signals?

I have an array of multiple URLs and ports. For each of them, I need to send and receive something back:

Flux.fromArray(trackersArray)
    .flatMap(tracker -> 
               ConnectToTracker.connect(tracker.getTracker(), tracker.getPort()))

I communicate with the servers in UDP so I can't tell if a server is alive or not unless I send a message which 'by some set of rules, need to respond to it.

ConnectToTracker.connect may send a onNext signal if the server response or onError signal if, for example, the server doesn't response (SocketTimeOutException) or any other failure (general IOException).

I don't want to terminate the flux if, for example, the onError signal is equal to SocketTimeOutException. Instead, I would like to try communicating with every tracker I got.

This link contains all the operations I can use to handle errors but not ignore them.

I'm using Reactor 3 if this matters.

Update:

I made an ugly trick, but works:

Flux.fromArray(trackersArray)
        .handle((Tracker tracker, SynchronousSink<ConnectResponse> sink) -> {
            ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
                    .subscribe(sink::next, error -> {
                        if (!(error instanceof SocketTimeoutException))
                            sink.error(error);
                    }, sink::complete);
        })

Please fill free to answer if you have anything better.

like image 593
Stav Alfi Avatar asked Jan 13 '18 19:01

Stav Alfi


3 Answers

since you are already processing urls in a flatmap, use onErrorResume(e -> Mono.empty()). this will let flatmap ignore the error. edit: within the flatmap, on the right hand side of the lambda

like image 94
Simon Baslé Avatar answered Sep 27 '22 16:09

Simon Baslé


Flux.fromArray(trackersArray)
.flatMap(tracker -> 
           ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
                .onErrorResume(SocketTimeoutException.class, __ -> Mono.empty()))

Maybe this is better of doing the same it will on recover from SocketTimeOut and if the exception is other i will go for the onError

like image 22
Enzo Bonggio Avatar answered Sep 27 '22 17:09

Enzo Bonggio


Now we have reactor.core.publisher.onErrorContinue() in version 3.3.2, which allows you to send onNext() signal when some elements is onError(). Use log() you will see better.

The signature is (throwable, instance) so if you want to log the errored out one, is useful.

Flux.fromIterable(aList)
    .flatMap(this::xxxx)
    .onErrorContinue((throwable, o) -> {
        log.error("Error while processing {}. Cause: {}", o, throwable.getMessage());
})
    ....
like image 22
WesternGun Avatar answered Sep 27 '22 17:09

WesternGun