I´m trying to figure out how handle errors when mapping elements inside a Flux.
For instance, I´m parsing a CSV string into one of my business POJOs:
myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
Some of this lines might contain errors, so what I get in the log is:
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001)) reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999)) reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo) reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
I read in the API some error handling methods, but most refered to returning an "error value" or using a fallback Flux, like this one:
Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
However, using this with my myflux
means that the whole flux is processed again.
So, is there a way to handle errors while processing particular elements (I.e ignoring them/Logging them) and keep processing the rest of the flux?
UPDATE with @akarnokd workaround
public Flux<StockQuotation> getQuotes(List<String> tickers) { Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers) // Get each set of quotes in a separate thread .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s))) // Convert each list of raw quotes string in a new Flux<String> .flatMap(list -> Flux.fromIterable(list)) // Convert the string to POJOs .flatMap(x -> { try { return Flux.just(converter.convertHistoricalCSVToStockQuotation(x)); } catch (IllegalArgumentException ex){ System.out.println("Error decoding stock quotation: " + x); return Flux.empty(); } }); return processingFlux; }
This works as a charm, however, as you can see the code is less elegant than before. Does not the Flux API have any method to do what this code does?
retry(...) retryWhen(...) onErrorResumeWith(...) onErrorReturn(...)
While Initialising WebClient As mentioned in the code block, whenever a 5XX/4XX Error occurs, we can throw a user defined exception, and then execute error handling logic based on those user defined exceptions. Once this error Handler is defined, we can add it in the WebClient Initialisation.
You need flatMap
instead which let's you return an empty sequence if the processing failed:
myflux.flatMap(v -> { try { return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock)); } catch (IllegalArgumentException ex) { return Flux.empty(); } });
If you want to use Reactor 3's methods for dealing with exceptions, you can use Mono.fromCallable
.
flatMap(x -> Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x)) .flux() .flatMap(Flux::fromIterable) .onErrorResume(Flux::empty) )
Unfortunately there is no Flux.fromCallable
, so assuming the callable returns a list, you have to convert it to a Flux manually.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With