Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Project Reactor: How to delay emission of (throttle) each element?

Consider the following Flux

Flux.range(1, 5)
  .parallel(10)
  .runOn(Schedulers.parallel())
  .map(i -> "https://www.google.com")
  .flatMap(uri -> Mono.fromCallable(new HttpGetTask(httpClient, uri)))

HttpGetTask is a Callable whose actual implementation is irrelevant in this case, it makes a HTTP GET call to the given URI and returns the content if successful.

Now, I'd like to slow down the emission by introducing an artificial delay, such that up to 10 threads are started simultaneously, but each one doesn't complete as soon as HttpGetTask is done. For example, say no thread must finish before 3 seconds. How do I achieve that?

like image 215
Abhijit Sarkar Avatar asked May 10 '17 14:05

Abhijit Sarkar


People also ask

What does mono defer do?

What Is the Mono. defer Method? Here, defer takes in a Supplier of Mono publisher and returns that Mono lazily when subscribed downstream.

What is the use of flux fromIterable?

Flux. fromIterable : This is used to build a stream from collections. All collections are of the Iterable<T> type, which can be passed to this to generate the intended stream.

What is flux in reactor?

Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.

What is the difference between mono and flux?

A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.


1 Answers

If the requirement is really "not less than 3s" you could add a delay of 3 seconds to the Mono inside the flatMap by using Mono.fromCallable(...).delayElement(Duration.ofSeconds(3)).

like image 186
Simon Baslé Avatar answered Sep 28 '22 11:09

Simon Baslé