I have below reactive code using flux in reactor core:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> redisHashReactiveCommands.hmset(key, map))
//.flatMap(... //want to store same data async into kafka with its own back pressure handling)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
.doOnComplete(() -> log.debug("On completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
As you can see, I have back pressure handling on this for external source to my process (FluxSink.OverflowStrategy.LATEST). However, I also want to configure back pressure for my process to redis (redisHashReactiveCommands.hmset(key, map)) since it can be a bigger bottleneck than external source to my process. I expect I'd need to create another flux for redis part and link it with this flux, but how do I achieve this since .flatMap works on individual item and not a stream of items?
Also, I want to store the same emitted item into Kafka as well but chaining flapMap's doesn't seem to work.. is there an easy way to link all these together in one set of functional calls (external source -> my process, my process -> redis, my process -> kafka)?
You need to use collectList() method in Flux to transform the Flux<Foo> to Mono<List<Foo>> .
A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.
To be eligible for a return and refund for all items, they must be unused and in the same condition that you received them, unless they are faulty and covered by our 60-day money back guarantee. The returned parcel is the responsibility of the customer until it has been received by FLUX Undies.
public abstract class ConnectableFlux<T> extends Flux<T> The abstract base class for connectable publishers that let subscribers pile up before they connect to their data source.
If you're not interested in the result objects in the main sequence you could combine both saves from within the flatMap
. You'd have to move the subscribeOn and log inside the flatMap as well to put them on the inner save publishers:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> Mono.when(
redisHashReactiveCommands.hmset(key, map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s)),
kafkaReactiveCommand.something(map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Kafka consumed. Result -> {}", s)),
))
//... this results in a Mono<Void>
.doOnComplete(() -> log.debug("Both redis and kafka completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
Alternatively, if you're sure both processes emit either a result element or an error, you can combine both results into a Tuple2
by replacing when
with zip
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With