I am trying to split my mono to other separated mono which will be processed the same data input data on different threads.
public Mono<String> process() {
Mono<String> someString = ... // fetching data from API
someString
.publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
.map(String::toLowerCase)
.subscribe(this::saveLowercase);
someString
.publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
.map(String::toUpperCase)
.subscribe(this::saveUpperCase);
return someString;
}
I saw in logs that I fetched data 3 times because each subscription call fetching data from API. I wanted to use the .cache() method but I wonder that some new better ways to call only once API and processed multiply this data? I can exchange the Reactor into RxJava if Reactor can't do this.
In the example you gave, you created a cold Mono
. So the HTTP
call would take place for each subscriber
, as you said. If you want the HTTP
call to take place only once, create hot observable. You'll have to use .cache()
so that any future subscribers get the response.
This is the proper way to do this. Why are you looking for "something better"?
Example from the official documentation:
DirectProcessor<String> hotSource = DirectProcessor.create();
Flux<String> hotFlux = hotSource.map(String::toUpperCase).cache();
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("blue");
hotSource.onNext("green");
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));
hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete();
Both the subscribers will get all the colors now.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With