Would this need back pressure or is there a simpler way?
For example in the below code , I want the spin function to be called every 2 seconds. Sometimes 'spin' can take longer time to compute than 2 second interval, in which case I do not want any interval emissions to queue up. But in the below code they do queue up.
In the code below, the first 4 spin function calls take 10 seconds and the rest take 1 second. As a result the Flux.interval emissions 'catch up' once the function gets faster. However, I do not want any 'catch up' to happen
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
public class Test {
public static void main(String[] args) {
Iterator<Integer> secs = new Iterator<Integer>() {
private int num = 0;
@Override
public boolean hasNext() {
return true;
}
@Override
public Integer next() {
return num++ < 4 ? 10 : 1;
}
};
Flux.interval(Duration.ofSeconds(5))
.map(n -> {spin(secs.next()); return n;})
.doOnNext(n -> log("Processed " + n))
.blockLast();
}
private static void spin(int secs) {
log("Current job will take " + secs + " secs");
long sleepTime = secs*1000000000L; // convert to nanos
long startTime = System.nanoTime();
while ((System.nanoTime() - startTime) < sleepTime) {}
}
static void log(Object label) {
System.out.println((new Date()).toString() + "\t| " +Thread.currentThread().getName() + "\t| " + label);
}
}
Output: Notice the "Processed" timestamp initially is spaced by 10 seconds, but from job 4 to job 8, there is a 'catch up' that I do not want to take place. I want to spin to executed no earlier than 2 seconds after the previous invocation
Thu Jun 01 17:16:23 EDT 2017 | parallel-1 | Current job will take 10 secs
Thu Jun 01 17:16:33 EDT 2017 | parallel-1 | Processed 0
Thu Jun 01 17:16:33 EDT 2017 | parallel-1 | Current job will take 10 secs
Thu Jun 01 17:16:43 EDT 2017 | parallel-1 | Processed 1
Thu Jun 01 17:16:43 EDT 2017 | parallel-1 | Current job will take 10 secs
Thu Jun 01 17:16:53 EDT 2017 | parallel-1 | Processed 2
Thu Jun 01 17:16:53 EDT 2017 | parallel-1 | Current job will take 10 secs
Thu Jun 01 17:17:03 EDT 2017 | parallel-1 | Processed 3
Thu Jun 01 17:17:03 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:04 EDT 2017 | parallel-1 | Processed 4
Thu Jun 01 17:17:04 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:05 EDT 2017 | parallel-1 | Processed 5
Thu Jun 01 17:17:05 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:06 EDT 2017 | parallel-1 | Processed 6
Thu Jun 01 17:17:06 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:07 EDT 2017 | parallel-1 | Processed 7
Thu Jun 01 17:17:07 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:08 EDT 2017 | parallel-1 | Processed 8
Thu Jun 01 17:17:08 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:09 EDT 2017 | parallel-1 | Processed 9
Thu Jun 01 17:17:13 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:14 EDT 2017 | parallel-1 | Processed 10
Thu Jun 01 17:17:18 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:19 EDT 2017 | parallel-1 | Processed 11
Instead of take(1) , you could use next() . This will transform the Flux into a valued Mono by taking the first emitted item, or an empty Mono if the Flux is empty itself.
A Flux is a Reactive Stream publisher that can emit 0 to N elements. It has several operators which are used to generate, orchestrate, and transform Flux sequences. A Flux can complete successfully or complete with errors.
Project Reactor is a direct implementation of the Reactive Streams Specification. The main feature of Reactive Streams Specification is that it provides a medium of communication between the stream producer and stream consumer so that a consumer can demand the stream according to its processing capabilities.
parallel() scheduler. Most of reactive libraries (Reactive Redis, Mongo, ...) would use parallel as a default scheduler. For example Spring WebFlux, typically use Reactor Netty as a default embedded server and would initiate subscription on Schedulers. parallel() .
You can delay events with the delayElements(Duration delay)
method. If you want more finegrained control you can use delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
Depending on your exact requirements the result might look like this:
Flux.interval(Duration.ofSeconds(5))
.map(n -> {spin(secs.next()); return n;})
.doOnNext(n -> log("Processed " + n))
.delayUntil(n -> Mono.just(1).delayElements(Duration.Seconds(Math.max(0, 2 - n)))
.blockLast();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With