I have a single source of data items and I want to share that Flux with multiple downstream streams.
It is very similar to the example in the reference guide,
but I feel that example cheats by calling .connect()
manually.
Specifically, I do not know how many downstream subscribers there will be, and I do not have control to call .connect()
"at the end".
Consumers should be able to subscribe, but not trigger the pulling of data immediately. And then somewhere in the future when the data is actually needed they will pull as necessary.
Additionally, the source is sensitive to the consumption so it cannot be re-fetched.
To add to that, it is going to be very big so buffering and replaying is not an option.
Ideally, on top of all that, the whole thing happens in one thread, so no concurrency or waiting.
(Giving a very small wait time for subscribers to join is not desirable)
I was able to achieve nearly the desired effect for Monos (single end result values):
public class CoConsumptionTest {
@Test
public void convenientCoConsumption() {
// List used just for the example:
List<Tuple2<String, String>> source = Arrays.asList(
Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
);
// Source which is sensitive to consumption
AtomicInteger consumedCount = new AtomicInteger(0);
Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();
@Override
public boolean hasNext() {
return sourceIterator.hasNext();
}
@Override
public Tuple2<String, String> next() {
Tuple2<String, String> e = sourceIterator.next();
consumedCount.incrementAndGet();
System.out.println("Audit: " + e);
return e;
}
};
// Logic in the service:
Flux<Tuple2<String, String>> f = Flux.fromIterable(() -> statefulIterator);
ConnectableFlux<Tuple2<String, String>> co = f.publish();
Function<Predicate<Tuple2<String, String>>, Mono<Tuple2<String, String>>> findOne = (highlySelectivePredicate) ->
co.filter(highlySelectivePredicate)
.next() //gives us a Mono
.toProcessor() //makes it eagerly subscribe and demand from the upstream, so it wont miss emissions
.doOnSubscribe(s -> co.connect()); //when an actual user consumer subscribes
// Subscribing (outside the service)
assumeThat(consumedCount).hasValue(0);
Mono<Tuple2<String, String>> a2 = findOne.apply(select("a", "2"));
Mono<Tuple2<String, String>> b1 = findOne.apply(select("b", "1"));
Mono<Tuple2<String, String>> c1 = findOne.apply(select("c", "1"));
assertThat(consumedCount).hasValue(0);
// Data is needed
SoftAssertions softly = new SoftAssertions();
assertThat(a2.block()).isEqualTo(Tuples.of("a", "2"));
softly.assertThat(consumedCount).hasValue(4);
assertThat(b1.block()).isEqualTo(Tuples.of("b", "1"));
softly.assertThat(consumedCount).hasValue(4);
assertThat(c1.block()).isEqualTo(Tuples.of("c", "1"));
softly.assertThat(consumedCount).hasValue(4);
softly.assertAll();
}
private static Predicate<Tuple2<String, String>> select(String t1, String t2) {
return e -> e.getT1().equals(t1) && e.getT2().equals(t2);
}
}
Question: I want to know how to achieve this for Flux results, i.e. for multiple values after the filtering is applied, not just the first/next. (Still demanding only as much as necessary)
(Tried naively replacing .toProcessor()
with .publish().autoConnect(0)
but did not succeed)
Edit 1: While buffering of the source is not allowed, the filters that come as parameters are expected to be highly selective, so buffering after the filtering is okay.
Edit 2:
Coming back to this after a while, I tried my posted example on a newer version of reactor
and it actually works.
io.projectreactor:reactor-bom:Californium-SR8
> io.projectreactor:reactor-core:3.2.9.RELEASE
public abstract class ConnectableFlux<T> extends Flux<T> The abstract base class for connectable publishers that let subscribers pile up before they connect to their data source. See Also: Flux.publish() , Reactive-Streams-Commons.
Project Reactor is a direct implementation of the Reactive Streams Specification. The main feature of Reactive Streams Specification is that it provides a medium of communication between the stream producer and stream consumer so that a consumer can demand the stream according to its processing capabilities.
By default, data production process starts on the Thread that initiated the subscription. Operators that deal with a time (e.g. Mono. delay ) will default to running on the Schedulers. parallel() scheduler.
Subscribe to this Flux and block until the upstream signals its first value, completes or a timeout expires. Subscribe to this Flux and block indefinitely until the upstream signals its last value or completes.
I don't like giving a "non-answer" style answer, but I think at least one of your requirements has to give here. From your question, the requirements seem to be:
Take the case where one subscriber requests data form a Flux
, the first few elements in that Flux
are consumed, and then eventually another subscriber shows up at an arbitrary point in the future that wants that same data. With the above requirements, that's impossible - you'll either have to go and get the data again, or have it saved somewhere, and you've ruled both those options out.
However, if you're prepared to relax those requirements a bit, then there's a few potential options:
If you can work out the number of subscribers you'll end up with somehow, then you can use autoConnect(n)
to automatically connect to a ConnectableFlux
after that number of subscriptions has been made.
If you can allow elements to be dropped, then you can just call share();
on the original Flux
to get it to auto-connect on the first subscription, and then future subscribers will have previous elements dropped.
This is perhaps one of the more promising strategies, since you say:
no concurrency or waiting. (Giving a very small wait time for subscribers to join is not desirable)
You can turn the Flux
into a hot source that caches all emitted elements for a certain time period. This means that you can, at the cost of some amount of memory (but without buffering the whole stream), give subscribers a small wait time when they can subscribe and still receive all the data.
Similarly to above, you can use another variant of the cache()
method to just cache a known number of elements. If you know you can safely fit n
elements into memory, but no more, then this could give you the maximum time possible for subscribers to safely connect.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With