Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle error while executing Flux.map()

I´m trying to figure out how handle errors when mapping elements inside a Flux.

For instance, I´m parsing a CSV string into one of my business POJOs:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock)); 

Some of this lines might contain errors, so what I get in the log is:

 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))  reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))  reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)  reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo 

I read in the API some error handling methods, but most refered to returning an "error value" or using a fallback Flux, like this one:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff); 

However, using this with my myflux means that the whole flux is processed again.

So, is there a way to handle errors while processing particular elements (I.e ignoring them/Logging them) and keep processing the rest of the flux?

UPDATE with @akarnokd workaround

public Flux<StockQuotation> getQuotes(List<String> tickers) {     Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)     // Get each set of quotes in a separate thread     .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))     // Convert each list of raw quotes string in a new Flux<String>     .flatMap(list -> Flux.fromIterable(list))     // Convert the string to POJOs     .flatMap(x -> {             try {                 return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));                 }             catch (IllegalArgumentException ex){                 System.out.println("Error decoding stock quotation: " + x);                 return Flux.empty();             }     });      return processingFlux; } 

This works as a charm, however, as you can see the code is less elegant than before. Does not the Flux API have any method to do what this code does?

retry(...) retryWhen(...) onErrorResumeWith(...) onErrorReturn(...) 
like image 297
Victor Avatar asked Mar 26 '16 15:03

Victor


People also ask

How does WebClient handle error response?

While Initialising WebClient As mentioned in the code block, whenever a 5XX/4XX Error occurs, we can throw a user defined exception, and then execute error handling logic based on those user defined exceptions. Once this error Handler is defined, we can add it in the WebClient Initialisation.


2 Answers

You need flatMap instead which let's you return an empty sequence if the processing failed:

myflux.flatMap(v -> {     try {         return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));     } catch (IllegalArgumentException ex) {         return Flux.empty();     } }); 
like image 67
akarnokd Avatar answered Sep 23 '22 02:09

akarnokd


If you want to use Reactor 3's methods for dealing with exceptions, you can use Mono.fromCallable.

flatMap(x ->      Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x))         .flux()         .flatMap(Flux::fromIterable)         .onErrorResume(Flux::empty) ) 

Unfortunately there is no Flux.fromCallable, so assuming the callable returns a list, you have to convert it to a Flux manually.

like image 23
Tsvetan Ovedenski Avatar answered Sep 22 '22 02:09

Tsvetan Ovedenski