Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Project Reactor: ConnectableFlux auto-connecting on demand

I have a single source of data items and I want to share that Flux with multiple downstream streams.

It is very similar to the example in the reference guide, but I feel that example cheats by calling .connect() manually. Specifically, I do not know how many downstream subscribers there will be, and I do not have control to call .connect() "at the end". Consumers should be able to subscribe, but not trigger the pulling of data immediately. And then somewhere in the future when the data is actually needed they will pull as necessary.

Additionally, the source is sensitive to the consumption so it cannot be re-fetched.
To add to that, it is going to be very big so buffering and replaying is not an option.

Ideally, on top of all that, the whole thing happens in one thread, so no concurrency or waiting.
(Giving a very small wait time for subscribers to join is not desirable)

I was able to achieve nearly the desired effect for Monos (single end result values):

public class CoConsumptionTest {
    @Test
    public void convenientCoConsumption() {
        // List used just for the example:
        List<Tuple2<String, String>> source = Arrays.asList(
                Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
                Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
                Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
        );

        // Source which is sensitive to consumption
        AtomicInteger consumedCount = new AtomicInteger(0);
        Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
            private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();

            @Override
            public boolean hasNext() {
                return sourceIterator.hasNext();
            }

            @Override
            public Tuple2<String, String> next() {
                Tuple2<String, String> e = sourceIterator.next();
                consumedCount.incrementAndGet();
                System.out.println("Audit: " + e);
                return e;
            }
        };

        // Logic in the service:
        Flux<Tuple2<String, String>> f = Flux.fromIterable(() -> statefulIterator);
        ConnectableFlux<Tuple2<String, String>> co = f.publish();

        Function<Predicate<Tuple2<String, String>>, Mono<Tuple2<String, String>>> findOne = (highlySelectivePredicate) ->
                co.filter(highlySelectivePredicate)
                        .next() //gives us a Mono
                        .toProcessor() //makes it eagerly subscribe and demand from the upstream, so it wont miss emissions
                        .doOnSubscribe(s -> co.connect()); //when an actual user consumer subscribes

        // Subscribing (outside the service)
        assumeThat(consumedCount).hasValue(0);
        Mono<Tuple2<String, String>> a2 = findOne.apply(select("a", "2"));
        Mono<Tuple2<String, String>> b1 = findOne.apply(select("b", "1"));
        Mono<Tuple2<String, String>> c1 = findOne.apply(select("c", "1"));
        assertThat(consumedCount).hasValue(0);

        // Data is needed
        SoftAssertions softly = new SoftAssertions();

        assertThat(a2.block()).isEqualTo(Tuples.of("a", "2"));
        softly.assertThat(consumedCount).hasValue(4);

        assertThat(b1.block()).isEqualTo(Tuples.of("b", "1"));
        softly.assertThat(consumedCount).hasValue(4);

        assertThat(c1.block()).isEqualTo(Tuples.of("c", "1"));
        softly.assertThat(consumedCount).hasValue(4);

        softly.assertAll();
    }

    private static Predicate<Tuple2<String, String>> select(String t1, String t2) {
        return e -> e.getT1().equals(t1) && e.getT2().equals(t2);
    }
}

Question: I want to know how to achieve this for Flux results, i.e. for multiple values after the filtering is applied, not just the first/next. (Still demanding only as much as necessary)
(Tried naively replacing .toProcessor() with .publish().autoConnect(0) but did not succeed)

Edit 1: While buffering of the source is not allowed, the filters that come as parameters are expected to be highly selective, so buffering after the filtering is okay.

Edit 2: Coming back to this after a while, I tried my posted example on a newer version of reactor and it actually works.

io.projectreactor:reactor-bom:Californium-SR8
> io.projectreactor:reactor-core:3.2.9.RELEASE
like image 736
Anly Avatar asked Jun 16 '19 12:06

Anly


People also ask

What is connectable flux?

public abstract class ConnectableFlux<T> extends Flux<T> The abstract base class for connectable publishers that let subscribers pile up before they connect to their data source. See Also: Flux.publish() , Reactive-Streams-Commons.

How does a project reactor work?

Project Reactor is a direct implementation of the Reactive Streams Specification. The main feature of Reactive Streams Specification is that it provides a medium of communication between the stream producer and stream consumer so that a consumer can demand the stream according to its processing capabilities.

What is the default scheduler in reactor?

By default, data production process starts on the Thread that initiated the subscription. Operators that deal with a time (e.g. Mono. delay ) will default to running on the Schedulers. parallel() scheduler.

Does flux subscribe block?

Subscribe to this Flux and block until the upstream signals its first value, completes or a timeout expires. Subscribe to this Flux and block indefinitely until the upstream signals its last value or completes.


Video Answer


1 Answers

I don't like giving a "non-answer" style answer, but I think at least one of your requirements has to give here. From your question, the requirements seem to be:

  • Buffering not allowed
  • Not allowed to drop elements
  • Unknown number of subscribers
  • Subscribers can connect at any time
  • Each subscriber must have all the data available when it demands it
  • No re-fetching from source

Take the case where one subscriber requests data form a Flux, the first few elements in that Flux are consumed, and then eventually another subscriber shows up at an arbitrary point in the future that wants that same data. With the above requirements, that's impossible - you'll either have to go and get the data again, or have it saved somewhere, and you've ruled both those options out.

However, if you're prepared to relax those requirements a bit, then there's a few potential options:

Known number of subscribers

If you can work out the number of subscribers you'll end up with somehow, then you can use autoConnect(n) to automatically connect to a ConnectableFlux after that number of subscriptions has been made.

Allowing elements to be dropped

If you can allow elements to be dropped, then you can just call share(); on the original Flux to get it to auto-connect on the first subscription, and then future subscribers will have previous elements dropped.

Allowing a time for subscribers to connect

This is perhaps one of the more promising strategies, since you say:

no concurrency or waiting. (Giving a very small wait time for subscribers to join is not desirable)

You can turn the Flux into a hot source that caches all emitted elements for a certain time period. This means that you can, at the cost of some amount of memory (but without buffering the whole stream), give subscribers a small wait time when they can subscribe and still receive all the data.

Buffering a known number of elements

Similarly to above, you can use another variant of the cache() method to just cache a known number of elements. If you know you can safely fit n elements into memory, but no more, then this could give you the maximum time possible for subscribers to safely connect.

like image 140
Michael Berry Avatar answered Oct 06 '22 10:10

Michael Berry