Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can one slow down emissions form Flux.interval?

Would this need back pressure or is there a simpler way?

For example in the below code , I want the spin function to be called every 2 seconds. Sometimes 'spin' can take longer time to compute than 2 second interval, in which case I do not want any interval emissions to queue up. But in the below code they do queue up.

In the code below, the first 4 spin function calls take 10 seconds and the rest take 1 second. As a result the Flux.interval emissions 'catch up' once the function gets faster. However, I do not want any 'catch up' to happen

import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.Date;
import java.util.Iterator;

public class Test {
   public static void main(String[] args) {
        Iterator<Integer> secs = new Iterator<Integer>() {
            private int num = 0;
            @Override
            public boolean hasNext() {
                return true;
            }
            @Override
            public Integer next() {
                return num++ < 4 ? 10 : 1;
            }
        };

        Flux.interval(Duration.ofSeconds(5))
                .map(n -> {spin(secs.next()); return n;})
                .doOnNext(n -> log("Processed " + n))
                .blockLast();

    }

    private static void spin(int secs) {
        log("Current job will take " + secs + " secs");
        long sleepTime = secs*1000000000L; // convert to nanos
        long startTime = System.nanoTime();
        while ((System.nanoTime() - startTime) < sleepTime) {}
    }

    static void log(Object label) {
        System.out.println((new Date()).toString() + "\t| " +Thread.currentThread().getName()   + "\t| " + label);
    }
}

Output: Notice the "Processed" timestamp initially is spaced by 10 seconds, but from job 4 to job 8, there is a 'catch up' that I do not want to take place. I want to spin to executed no earlier than 2 seconds after the previous invocation

Thu Jun 01 17:16:23 EDT 2017    | parallel-1    | Current job will take 10 secs
Thu Jun 01 17:16:33 EDT 2017    | parallel-1    | Processed 0
Thu Jun 01 17:16:33 EDT 2017    | parallel-1    | Current job will take 10 secs
Thu Jun 01 17:16:43 EDT 2017    | parallel-1    | Processed 1
Thu Jun 01 17:16:43 EDT 2017    | parallel-1    | Current job will take 10 secs
Thu Jun 01 17:16:53 EDT 2017    | parallel-1    | Processed 2
Thu Jun 01 17:16:53 EDT 2017    | parallel-1    | Current job will take 10 secs
Thu Jun 01 17:17:03 EDT 2017    | parallel-1    | Processed 3
Thu Jun 01 17:17:03 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:04 EDT 2017    | parallel-1    | Processed 4
Thu Jun 01 17:17:04 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:05 EDT 2017    | parallel-1    | Processed 5
Thu Jun 01 17:17:05 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:06 EDT 2017    | parallel-1    | Processed 6
Thu Jun 01 17:17:06 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:07 EDT 2017    | parallel-1    | Processed 7
Thu Jun 01 17:17:07 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:08 EDT 2017    | parallel-1    | Processed 8
Thu Jun 01 17:17:08 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:09 EDT 2017    | parallel-1    | Processed 9
Thu Jun 01 17:17:13 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:14 EDT 2017    | parallel-1    | Processed 10
Thu Jun 01 17:17:18 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:19 EDT 2017    | parallel-1    | Processed 11
like image 343
Gaurav Avatar asked Jun 01 '17 21:06

Gaurav


People also ask

How do you convert flux to Mono?

Instead of take(1) , you could use next() . This will transform the Flux into a valued Mono by taking the first emitted item, or an empty Mono if the Flux is empty itself.

What is flux used for in Java?

A Flux is a Reactive Stream publisher that can emit 0 to N elements. It has several operators which are used to generate, orchestrate, and transform Flux sequences. A Flux can complete successfully or complete with errors.

How does Project reactor work?

Project Reactor is a direct implementation of the Reactive Streams Specification. The main feature of Reactive Streams Specification is that it provides a medium of communication between the stream producer and stream consumer so that a consumer can demand the stream according to its processing capabilities.

What is the default scheduler in reactor?

parallel() scheduler. Most of reactive libraries (Reactive Redis, Mongo, ...) would use parallel as a default scheduler. For example Spring WebFlux, typically use Reactor Netty as a default embedded server and would initiate subscription on Schedulers. parallel() .


1 Answers

You can delay events with the delayElements(Duration delay) method. If you want more finegrained control you can use delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)

Depending on your exact requirements the result might look like this:

 Flux.interval(Duration.ofSeconds(5))
     .map(n -> {spin(secs.next()); return n;})
     .doOnNext(n -> log("Processed " + n))
     .delayUntil(n -> Mono.just(1).delayElements(Duration.Seconds(Math.max(0, 2 - n)))
     .blockLast();
like image 176
Jens Schauder Avatar answered Sep 28 '22 07:09

Jens Schauder