I have created a parallet flux from iterable. And on each iterable I have to make a rest call. But while executing even if any of the request fails , all the remaining requests also fail. I want all the requests to be executed irrespective of failure or success.
I am currently using Flux.fromIterable and using runOn operator
Flux.fromIterable(actions)
.parallel()
.runOn(Schedulars.elastic())
.flatMap(request -> someRemoteCall)
.sequential()
.subscribe();
I want all the requests in iterable to be executed , irrespective of the failure or success. But as of now some gets executed and some gets failed.
To perform parallel execution using ParallelFlux , first you need to convert a Flux<T> into a ParallelFlux<T> . After that, you have to call runOn(Scheduler) to specify in which scheduler the elements should be processed. Without that, it cannot be processed in parallel.
ParallelFlux is created from an existing Flux , using the parallel operator. By default, this splits the stream into the total number of CPU cores that are available. ParallelFlux only divides the stream, and does not change the execution model. Instead, it executes the streams on the default thread—the main thread.
flatMap() operator We use flatMap() when the transformation returns a Flux or Mono. The flatMap() flattens the result and extracts the data from the Mono. So we should use it when we know that we will have one of the Reactive Types as the data source. That would be all regarding how to transform Flux and Mono in Java.
Flux#fromIterable() method can be used to create a Flux that emits the items contained in the provided Iterable.
There's three possible ways I generally use to achieve this:
flatMap()
, the second of which is a mapperOnError
-eg. .flatMap(request -> someRemoteCall(), x->Mono.empty(), null)
;onErrorResume(x -> Mono.empty())
as a separate call to ignore any error;.onErrorResume(MyException.class, x -> Mono.empty()))
to just ignore errors of a certain type.The second is what I tend to use by default, as I find that clearest.
Because of .parallel().runOn(...)
usage you can't use onErrorContinue
as below:
.parallel()
.runOn(...)
.flatMap(request -> someRemoteCall)
.onErrorContinue(...)
but you might be able to use it like this:
.parallel().runOn(...)
.flatMap(request -> someRemoteCall
.onErrorContinue((t, o) -> log.error("Skipped error: {}", t.getMessage()))
)
provided that someRemoteCall
is a Mono
or Flux
not itself run on .parallel().runOn(...)
rails.
But when you don't have a someRemoteCall
you can do the trick below (see NOT_MONO_AND_NOT_FLUX
) to ignore the unsafe processing run on .parallel().runOn(...)
rails:
Optional<List<String>> foundImageNames =
Flux.fromStream(this.fileStoreService.walk(path))
.parallel(cpus, cpus)
.runOn(Schedulers.newBoundedElastic(cpus, Integer.MAX_VALUE, "import"), 1)
.flatMap(NOT_MONO_AND_NOT_FLUX -> Mono
.just(NOT_MONO_AND_NOT_FLUX)
.map(path -> sneak(() -> unsafeLocalSimpleProcessingReturningString(path)))
.onErrorContinue(FileNotFoundException.class,
(t, o) -> log.error("File missing:\n{}", t.getMessage()))
)
.collectSortedList(Comparator.naturalOrder())
.blockOptional();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With