Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Difference Between Flux.create and Flux.generate

What is the difference between Flux.create and Flux.generate? I am looking--ideally with an example use case--to understand when I should use one or the other.

like image 317
JJ Zabkar Avatar asked Apr 20 '18 23:04

JJ Zabkar


People also ask

What is flux generate?

The generate() method of the Flux API provides a simple and straightforward programmatic approach to creating a Flux. The generate() method takes a generator function to produce a sequence of items. There are three variants of the generate method: generate(Consumer<SynchronousSink<T>> generator)

What is reactor core publisher flux?

A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).

What is flux in WebFlux?

Spring WebFlux internally uses Project Reactor and its publisher implementations, Flux and Mono. Mono — A publisher that can emit 0 or 1 element. Flux — A publisher that can emit 0.. N elements.

What is blockLast ()?

Using .blockLast() This method is used to subscribe to the Flux and wait until the last value from upstream received.


2 Answers

In short:

Flux::create doesn't react to changes in the state of the app while Flux::generate does.


The long version

Flux::create

You will use it when you want to calculate multiple (0...infinity) values which are not influenced by the state of your app and the state of your pipeline (your pipeline == the chain of operations which comes after Flux::create == downstream).

Why? Because the method which you sent to Flux::create keeps calculating elements (or none). The downstream will determine how many elements (elements == next signals) it wants and if he can't keep up, those elements which are already emitted will be removed/buffered in some strategy (by default they will be buffered until the downstream will ask for more).

The first and easiest use case is for emitting values which you, theoretically, could sum to a collection and only then take each element and do something with it:

Flux<String> articlesFlux = Flux.create((FluxSink<String> sink) -> { /* get all the latest article from a server and emit them one by one to downstream. */ List<String> articals = getArticalsFromServer(); articals.forEach(sink::next); }); 

As you can see, Flux.create is used for interaction between blocking method (getArticalsFromServer) to asynchronous code.

I'm sure there are other use cases for Flux.create.


Flux::generate

Flux.generate((SynchronousSink<Integer> synchronousSink) -> {         synchronousSink.next(1);     })     .doOnNext(number -> System.out.println(number))     .doOnNext(number -> System.out.println(number + 4))     .subscribe(); 

The output will be 1 5 1 5 1 5................forever

In each invocation of the method you sent to Flux::generate, synchronousSink can only emits: onSubscribe onNext? (onError | onComplete)?.

It means that Flux::generate will calculate and emit values on demand. When should you use it? In cases where it's too expensive to calculate elements which may not be used downstream or the events which you emit are influenced by the state of the app or from the state of your pipeline (your pipeline == the chain of operations which comes after Flux::create == downstream).

For example, if you are building a torrent application then you are receiving blocks of data in real time. You could use Flux::generate to give tasks (blocks to download) to multiple threads and you will calculate the block you want to download inside Flux::generate only when some thread is asking. So you will emit only blocks you don't have. The same algorithm with Flux::create will fail because Flux::create will emit all the blocks we don't have and if some blocks failed to be downloaded then we have a problem. because Flux::create doesn't react to changes in the state of the app while Flux::generate does.

like image 159
Stav Alfi Avatar answered Sep 22 '22 22:09

Stav Alfi


Create:

enter image description here

  • Accepts a Consumer<FluxSink<T>>
  • Consumer is invoked only once per subscriber
  • Consumer can emit 0..N elements immediately
  • Publisher is not aware of downstream state. So we need to provide Overflow strategy as an additional parameter
  • We can get the reference of FluxSink using which we could keep on emitting elements as and when required using multiple threads.

Generate:

enter image description here

  • Accepts a Consumer<SynchronousSink<T>>
  • Consumer is invoked again and again based on the downstream demand
  • Consumer can emit only one element at the max with an optional complete/error signal.
  • Publisher produces elements based on the downstream demand
  • We can get the reference of SynchronousSink. But it might not be really useful as we could emit only one element

Check this blog for more details.

like image 42
vins Avatar answered Sep 23 '22 22:09

vins