I am trying to figure out the conceptual differences between an infinite Stream and an infinite Flux respectively (if there are any).
For that matter, I have come up with the following examples for an infinite Stream/Flux
@Test
public void infinteStream() {
//Prints infinite number of integers
Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1);
infiniteStream.forEach(System.out::println);
}
@Test
public void infiniteFlux() {
//Prints infinite number of date strings (every second)
Flux<LocalDateTime> localDateTimeFlux = Flux.interval(Duration.ofSeconds(1))
.map(t -> LocalDateTime.now());
localDateTimeFlux.subscribe(t -> System.out.println(t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"))));
}
Regarding these examples, I have the question: Is there an analog for infinteStream() with Flux (and for infinteFlux() with Stream respectively)? And, more generally, are there any differences between an infinite Stream and Flux?
A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.
Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.
We can create an infinite stream of any custom type elements by passing a function of a Supplier interface to a generate() method on a Stream.
Reactor is a fourth-generation reactive library, based on the Reactive Streams. specification, for building non-blocking applications on the JVM.
Stream
and Flux
are quite different:
Stream
is single use, vs. you can subscribe multiple times to Flux
Stream
is pull based (consuming one element calls for the next one) vs. Flux
has an hybrid push/pull model where the publisher can push elements but still has to respect backpressure signaled by the consumerStream
are synchronous sequences vs. Flux
can represent asynchronous sequencesIn example you're generating an infinite sequence of values with Stream
, they're produced and consumed as fast as possible. In your Flux
example, you're producing values at a fixed interval (something I'm not sure you can do with Stream
). With Flux
, you can also Flux.generate
sequences without intervals, just like your Stream
example.
In general, you could consider Flux
as a mix of Stream
+ CompletableFuture
, with:
For reference, in the meantime, I have come up with a Stream-Solution for infiniteFlux():
@Test
public void infiniteFluxWithStream() {
Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1).peek(x->{
LocalDateTime t = LocalDateTime.now();
t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
System.out.println(t);
});
infiniteStream.forEach(x->{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
This is indeed ugly. However, it shows that in (very) principle, it is possible to rewrite simple Flux-Examples in terms of Streams.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With