Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Difference between Infinite Java Stream and Reactor Flux

I am trying to figure out the conceptual differences between an infinite Stream and an infinite Flux respectively (if there are any).

For that matter, I have come up with the following examples for an infinite Stream/Flux

@Test
public void infinteStream() {

  //Prints infinite number of integers
  Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1);
 
  infiniteStream.forEach(System.out::println);
}

@Test
public void infiniteFlux()  {
    
   //Prints infinite number of date strings (every second)
   Flux<LocalDateTime> localDateTimeFlux = Flux.interval(Duration.ofSeconds(1))
            .map(t -> LocalDateTime.now());

    localDateTimeFlux.subscribe(t -> System.out.println(t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"))));
}

Regarding these examples, I have the question: Is there an analog for infinteStream() with Flux (and for infinteFlux() with Stream respectively)? And, more generally, are there any differences between an infinite Stream and Flux?

like image 437
Felix Avatar asked Oct 15 '18 15:10

Felix


People also ask

What is difference between mono and flux?

A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.

What is flux in reactor?

Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.

Can Java streams be infinite?

We can create an infinite stream of any custom type elements by passing a function of a Supplier interface to a generate() method on a Stream.

What is reactor in Java?

Reactor is a fourth-generation reactive library, based on the Reactive Streams. specification, for building non-blocking applications on the JVM.


2 Answers

Stream and Flux are quite different:

  • Stream is single use, vs. you can subscribe multiple times to Flux
  • Stream is pull based (consuming one element calls for the next one) vs. Flux has an hybrid push/pull model where the publisher can push elements but still has to respect backpressure signaled by the consumer
  • Stream are synchronous sequences vs. Flux can represent asynchronous sequences

In example you're generating an infinite sequence of values with Stream, they're produced and consumed as fast as possible. In your Flux example, you're producing values at a fixed interval (something I'm not sure you can do with Stream). With Flux, you can also Flux.generate sequences without intervals, just like your Stream example.

In general, you could consider Flux as a mix of Stream + CompletableFuture, with:

  • a lot of powerful operators
  • backpressure support
  • control over publisher and subscriber behavior
  • control over the notion of time (buffering windows of values, adding timeouts and fallbacks, etc)
  • something tailored for async sequences fetched over the network (from a database or a remote Web API)
like image 185
Brian Clozel Avatar answered Nov 15 '22 19:11

Brian Clozel


For reference, in the meantime, I have come up with a Stream-Solution for infiniteFlux():

@Test 
public void infiniteFluxWithStream()  {

    Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1).peek(x->{
    LocalDateTime t = LocalDateTime.now();
    t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
    System.out.println(t);
    });

    infiniteStream.forEach(x->{
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }); 

}

This is indeed ugly. However, it shows that in (very) principle, it is possible to rewrite simple Flux-Examples in terms of Streams.

like image 25
Felix Avatar answered Nov 15 '22 21:11

Felix