Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to chain flux to another flux/mono and apply another back pressure?

Tags:

I have below reactive code using flux in reactor core:

Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
    .flatMap(map -> redisHashReactiveCommands.hmset(key, map))
    //.flatMap(... //want to store same data async into kafka with its own back pressure handling)
    .subscribeOn(Schedulers.parallel())
    .doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
    .doOnComplete(() -> log.debug("On completed."))
    .doOnError(exception -> log.error("Error occurred while consuming message", exception))
    .subscribe();

As you can see, I have back pressure handling on this for external source to my process (FluxSink.OverflowStrategy.LATEST). However, I also want to configure back pressure for my process to redis (redisHashReactiveCommands.hmset(key, map)) since it can be a bigger bottleneck than external source to my process. I expect I'd need to create another flux for redis part and link it with this flux, but how do I achieve this since .flatMap works on individual item and not a stream of items?

Also, I want to store the same emitted item into Kafka as well but chaining flapMap's doesn't seem to work.. is there an easy way to link all these together in one set of functional calls (external source -> my process, my process -> redis, my process -> kafka)?

like image 375
Thomas Lee Avatar asked Nov 04 '18 06:11

Thomas Lee


People also ask

How do you convert flux to Mono?

You need to use collectList() method in Flux to transform the Flux<Foo> to Mono<List<Foo>> .

What is the difference between mono and flux?

A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.

How do I return flux?

To be eligible for a return and refund for all items, they must be unused and in the same condition that you received them, unless they are faulty and covered by our 60-day money back guarantee. The returned parcel is the responsibility of the customer until it has been received by FLUX Undies.

What is connectable flux?

public abstract class ConnectableFlux<T> extends Flux<T> The abstract base class for connectable publishers that let subscribers pile up before they connect to their data source.


1 Answers

If you're not interested in the result objects in the main sequence you could combine both saves from within the flatMap. You'd have to move the subscribeOn and log inside the flatMap as well to put them on the inner save publishers:

Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
    .flatMap(map -> Mono.when(
        redisHashReactiveCommands.hmset(key, map)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(s -> log.debug("Redis consumed. Result -> {}", s)),

        kafkaReactiveCommand.something(map)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(s -> log.debug("Kafka consumed. Result -> {}", s)),
    ))
    //... this results in a Mono<Void>
    .doOnComplete(() -> log.debug("Both redis and kafka completed."))
    .doOnError(exception -> log.error("Error occurred while consuming message", exception))
    .subscribe();

Alternatively, if you're sure both processes emit either a result element or an error, you can combine both results into a Tuple2 by replacing when with zip.

like image 124
Simon Baslé Avatar answered Sep 30 '22 17:09

Simon Baslé