Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dealing with parallel flux in Reactor

I have created a parallet flux from iterable. And on each iterable I have to make a rest call. But while executing even if any of the request fails , all the remaining requests also fail. I want all the requests to be executed irrespective of failure or success.

I am currently using Flux.fromIterable and using runOn operator

Flux.fromIterable(actions)
.parallel()
.runOn(Schedulars.elastic())
.flatMap(request -> someRemoteCall)     
.sequential()
.subscribe();

I want all the requests in iterable to be executed , irrespective of the failure or success. But as of now some gets executed and some gets failed.

like image 807
naval jain Avatar asked Aug 17 '19 08:08

naval jain


People also ask

How do you use parallel Flux?

To perform parallel execution using ParallelFlux , first you need to convert a Flux<T> into a ParallelFlux<T> . After that, you have to call runOn(Scheduler) to specify in which scheduler the elements should be processed. Without that, it cannot be processed in parallel.

What is parallel Flux?

ParallelFlux is created from an existing Flux , using the parallel operator. By default, this splits the stream into the total number of CPU cores that are available. ParallelFlux only divides the stream, and does not change the execution model. Instead, it executes the streams on the default thread—the main thread.

How do you convert Flux to Mono?

flatMap() operator We use flatMap() when the transformation returns a Flux or Mono. The flatMap() flattens the result and extracts the data from the Mono. So we should use it when we know that we will have one of the Reactive Types as the data source. That would be all regarding how to transform Flux and Mono in Java.

What does Flux fromIterable do?

Flux#fromIterable() method can be used to create a Flux that emits the items contained in the provided Iterable.


2 Answers

There's three possible ways I generally use to achieve this:

  • Use the 3 argument version of flatMap(), the second of which is a mapperOnError -eg. .flatMap(request -> someRemoteCall(), x->Mono.empty(), null);
  • Use onErrorResume(x -> Mono.empty()) as a separate call to ignore any error;
  • Use .onErrorResume(MyException.class, x -> Mono.empty())) to just ignore errors of a certain type.

The second is what I tend to use by default, as I find that clearest.

like image 81
Michael Berry Avatar answered Sep 29 '22 11:09

Michael Berry


Because of .parallel().runOn(...) usage you can't use onErrorContinue as below:

.parallel()
.runOn(...)
.flatMap(request -> someRemoteCall)
.onErrorContinue(...)

but you might be able to use it like this:

.parallel().runOn(...)
.flatMap(request -> someRemoteCall
        .onErrorContinue((t, o) -> log.error("Skipped error: {}", t.getMessage()))
)

provided that someRemoteCall is a Mono or Flux not itself run on .parallel().runOn(...) rails.

But when you don't have a someRemoteCall you can do the trick below (see NOT_MONO_AND_NOT_FLUX) to ignore the unsafe processing run on .parallel().runOn(...) rails:

Optional<List<String>> foundImageNames = 
    Flux.fromStream(this.fileStoreService.walk(path))
        .parallel(cpus, cpus)
        .runOn(Schedulers.newBoundedElastic(cpus, Integer.MAX_VALUE, "import"), 1)

        .flatMap(NOT_MONO_AND_NOT_FLUX -> Mono
            .just(NOT_MONO_AND_NOT_FLUX)
            .map(path -> sneak(() -> unsafeLocalSimpleProcessingReturningString(path)))
            .onErrorContinue(FileNotFoundException.class,
                (t, o) -> log.error("File missing:\n{}", t.getMessage()))
        )

        .collectSortedList(Comparator.naturalOrder())
        .blockOptional();
like image 34
adrhc Avatar answered Sep 29 '22 12:09

adrhc