Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert cold stream into hot stream in Project Reactor 3?

As per the definition of Mono and Flux, both represent an asynchronous sequence of data, and nothing happens before you subscribe.

And there are two broad families of publishers: hot and cold. Mono and Flux generate data anew for each subscription. If no subscription is created, then data never gets generated.

And Hot publishers, on the other hand, do not depend on any number of subscribers.

Here is my code for the cold stream:

        System.out.println("*********Calling coldStream************");
        Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
                .doOnNext(System.out::println)
                .filter(s -> s.startsWith("l"))
                .map(String::toUpperCase);

        source.subscribe(d -> System.out.println("Subscriber 1: "+d));
        source.subscribe(d -> System.out.println("Subscriber 2: "+d));
        System.out.println("-------------------------------------");

and here is the output:

*********Calling composeStream************
ram
sam
dam
lam
Subscriber 1: LAM
ram
sam
dam
lam
Subscriber 2: LAM
-------------------------------------

How can i convert the above cold stream into the hot stream?

like image 519
KayV Avatar asked Jan 29 '23 22:01

KayV


1 Answers

You can convert cold stream into hot stream by calling "publish" on the cold stream, it will create a ConnectableFlux. Since it is a hot stream nothing will happen until you call connect method on it, even if you subscribed. try out this example:

   Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
            .doOnNext(System.out::println)
            .filter(s -> s.startsWith("l"))
            .map(String::toUpperCase);

    ConnectableFlux<String> connectable = source.publish();
    connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
    connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));
    connectable.connect();

The output is :

ram sam dam lam Subscriber 1: LAM Subscriber 2: LAM

Second example:

 Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
            .doOnNext(System.out::println)
            .filter(s -> s.startsWith("l"))
            .map(String::toUpperCase);

    ConnectableFlux<String> connectable = source.publish();
    connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
    connectable.connect();
    connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));

The output is:

ram sam dam lam Subscriber 1: LAM

With these two example you can see that data start flowing from the moment wee call "connect" method

like image 128
excbot Avatar answered Apr 08 '23 14:04

excbot