As per the definition of Mono and Flux, both represent an asynchronous sequence of data, and nothing happens before you subscribe.
And there are two broad families of publishers: hot and cold. Mono and Flux generate data anew for each subscription. If no subscription is created, then data never gets generated.
And Hot publishers, on the other hand, do not depend on any number of subscribers.
Here is my code for the cold stream:
System.out.println("*********Calling coldStream************");
Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("l"))
.map(String::toUpperCase);
source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));
System.out.println("-------------------------------------");
and here is the output:
*********Calling composeStream************
ram
sam
dam
lam
Subscriber 1: LAM
ram
sam
dam
lam
Subscriber 2: LAM
-------------------------------------
How can i convert the above cold stream into the hot stream?
You can convert cold stream into hot stream by calling "publish" on the cold stream, it will create a ConnectableFlux. Since it is a hot stream nothing will happen until you call connect method on it, even if you subscribed. try out this example:
Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("l"))
.map(String::toUpperCase);
ConnectableFlux<String> connectable = source.publish();
connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));
connectable.connect();
The output is :
ram sam dam lam Subscriber 1: LAM Subscriber 2: LAM
Second example:
Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("l"))
.map(String::toUpperCase);
ConnectableFlux<String> connectable = source.publish();
connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
connectable.connect();
connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));
The output is:
ram sam dam lam Subscriber 1: LAM
With these two example you can see that data start flowing from the moment wee call "connect" method
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With