Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Project Reactor: doOnNext (or the others doOnXXX) async

Is there any method like doOnNext, but async? For example, I need to do some long logging (sent notification by email) for determined element.

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            // For example, we need to do something time-consuming only for 3

            if (v.equals(3)) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("LOG FOR " + v);
        });

ints.subscribe(System.out::println);

But why should I wait for logging of 3? I want to do this logic asynchronously.

Now I have only this solution

Thread.sleep(10000);

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Scheduler myParallel2 = Schedulers.newParallel("my-parallel2", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            Mono.just(v).publishOn(myParallel2).subscribe(value -> {
                if (value.equals(3)) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println("LOG FOR " + value);
            });
        });

ints.subscribe(System.out::println);

Is there any "nice" solution for this?

like image 648
Вадим Парафенюк Avatar asked Dec 12 '18 14:12

Вадим Парафенюк


People also ask

Is doonnext after or after the source in a pipeline?

Since doOnNext provides you the element flowing through the pipeline with a Consumer, it is after. Order is something like this: source emits item -> doOnNext lambda is triggered -> subscriber onNext is called: github.com/reactor/reactor-core/blob/master/reactor-core/src/…

What is project reactor in Azure SDK for Java?

The Azure SDK for Java adopted Project Reactor to offer its async APIs. The main factor driving this decision was to provide smooth integration with Spring Webflux which also uses Project Reactor. Another contributing factor to choose Project Reactor over RxJava was that Project Reactor uses Java 8 whereas RxJava, at the time, was still at Java 7.

When does doonnext get triggered before or after element is emitted?

Java Reactor Flux/Mono, when does doOnNext get triggered before or after element is emitted? I have this confusion, when does doOnNext is triggered before or after of element emission by Publisher (Flux/Mono). Since doOnNext provides you the element flowing through the pipeline with a Consumer, it is after.

How do flux and doonnext work together?

So over here, the subscriber subscribes to the doOnNext (), and the doOnNext () subscribes to the original flux, which then starts emitting events. doOnNext () then intercepts each event and performs some side-effect. Prashant, I admit that I am still trying to get my head around the reactive stuff.


1 Answers

If you're absolutely sure you don't care wether or not the email sending succeeds, then you could use "subscribe-inside-doOnNext" but I'm pretty confident that would be a mistake.

In order to have your Flux propagate an onError signal if the "logging" fails, the recommended approach is to use flatMap.

The good news is that since flatMap merges results from the inner publishers immediately into the main sequence, you can get still emit each element immediately AND trigger the email. The only caveat is that the whole thing will only complete once the email-sending Mono has also completed. You can also check within the flatMap lambda if the logging needs to happen at all (rather than inside the inner Mono):

//assuming sendEmail returns a Mono<Void>, takes care of offsetting any blocking send onto another Scheduler

source //we assume elements are also publishOn as relevant in `source`
   .flatMap(v -> {
       //if we can decide right away wether or not to send email, better do it here
       if (shouldSendEmailFor(v)) {
           //we want to immediately re-emit the value, then trigger email and wait for it to complete
           return Mono.just(v)
               .concatWith(
                   //since Mono<Void> never emits onNext, it is ok to cast it to V
                   //which makes it compatible with concat, keeping the whole thing a Flux<V>
                   sendEmail(v).cast(V.class)
               );
       } else {
           return Mono.just(v);
       }
    });
like image 101
Simon Baslé Avatar answered Oct 06 '22 11:10

Simon Baslé