Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Reactor multiple subscription

I am trying to split my mono to other separated mono which will be processed the same data input data on different threads.

public Mono<String> process() { 
        Mono<String> someString = ... // fetching data from API 



        someString
                .publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
                .map(String::toLowerCase)
                .subscribe(this::saveLowercase);

        someString
                .publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
                .map(String::toUpperCase)
                .subscribe(this::saveUpperCase);

        return someString;
}

I saw in logs that I fetched data 3 times because each subscription call fetching data from API. I wanted to use the .cache() method but I wonder that some new better ways to call only once API and processed multiply this data? I can exchange the Reactor into RxJava if Reactor can't do this.

like image 528
Matrix12 Avatar asked Sep 16 '25 20:09

Matrix12


1 Answers

In the example you gave, you created a cold Mono. So the HTTP call would take place for each subscriber, as you said. If you want the HTTP call to take place only once, create hot observable. You'll have to use .cache() so that any future subscribers get the response.

This is the proper way to do this. Why are you looking for "something better"?

Example from the official documentation:

DirectProcessor<String> hotSource = DirectProcessor.create();

Flux<String> hotFlux = hotSource.map(String::toUpperCase).cache();


hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.onNext("blue");
hotSource.onNext("green");

hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete(); 

Both the subscribers will get all the colors now.

like image 155
Prashant Pandey Avatar answered Sep 21 '25 01:09

Prashant Pandey