Is there any method like doOnNext, but async? For example, I need to do some long logging (sent notification by email) for determined element.
Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
.publishOn(myParallel)
.doOnNext(v -> {
// For example, we need to do something time-consuming only for 3
if (v.equals(3)) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("LOG FOR " + v);
});
ints.subscribe(System.out::println);
But why should I wait for logging of 3? I want to do this logic asynchronously.
Now I have only this solution
Thread.sleep(10000);
Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Scheduler myParallel2 = Schedulers.newParallel("my-parallel2", 4);
Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
.publishOn(myParallel)
.doOnNext(v -> {
Mono.just(v).publishOn(myParallel2).subscribe(value -> {
if (value.equals(3)) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("LOG FOR " + value);
});
});
ints.subscribe(System.out::println);
Is there any "nice" solution for this?
Since doOnNext provides you the element flowing through the pipeline with a Consumer, it is after. Order is something like this: source emits item -> doOnNext lambda is triggered -> subscriber onNext is called: github.com/reactor/reactor-core/blob/master/reactor-core/src/…
The Azure SDK for Java adopted Project Reactor to offer its async APIs. The main factor driving this decision was to provide smooth integration with Spring Webflux which also uses Project Reactor. Another contributing factor to choose Project Reactor over RxJava was that Project Reactor uses Java 8 whereas RxJava, at the time, was still at Java 7.
Java Reactor Flux/Mono, when does doOnNext get triggered before or after element is emitted? I have this confusion, when does doOnNext is triggered before or after of element emission by Publisher (Flux/Mono). Since doOnNext provides you the element flowing through the pipeline with a Consumer, it is after.
So over here, the subscriber subscribes to the doOnNext (), and the doOnNext () subscribes to the original flux, which then starts emitting events. doOnNext () then intercepts each event and performs some side-effect. Prashant, I admit that I am still trying to get my head around the reactive stuff.
If you're absolutely sure you don't care wether or not the email sending succeeds, then you could use "subscribe-inside-doOnNext" but I'm pretty confident that would be a mistake.
In order to have your Flux
propagate an onError
signal if the "logging" fails, the recommended approach is to use flatMap
.
The good news is that since flatMap
merges results from the inner publishers immediately into the main sequence, you can get still emit each element immediately AND trigger the email. The only caveat is that the whole thing will only complete once the email-sending Mono
has also completed. You can also check within the flatMap
lambda if the logging needs to happen at all (rather than inside the inner Mono
):
//assuming sendEmail returns a Mono<Void>, takes care of offsetting any blocking send onto another Scheduler
source //we assume elements are also publishOn as relevant in `source`
.flatMap(v -> {
//if we can decide right away wether or not to send email, better do it here
if (shouldSendEmailFor(v)) {
//we want to immediately re-emit the value, then trigger email and wait for it to complete
return Mono.just(v)
.concatWith(
//since Mono<Void> never emits onNext, it is ok to cast it to V
//which makes it compatible with concat, keeping the whole thing a Flux<V>
sendEmail(v).cast(V.class)
);
} else {
return Mono.just(v);
}
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With