Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I process Flux events in parallel to each other?

I have streams of incoming events that need to be enriched, and then processed in parallel as they arrive.

I was thinking Project Reactor was made to order for the job, but in my tests all of the processing seems to be done serially.

Here is some test code:

ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
        .map(i-> {
            System.out.println("ReactorTests.test " + Thread.currentThread());
            sleep(1000L); // simulate IO delay
            return String.format("String %d", i);
        })
        .take(3)
//    .subscribeOn(Schedulers.elastic());
//    .subscribeOn(Schedulers.newParallel("test"));
//    .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()), 
               System.out::println, 
               ()-> System.out.println("Done"));
System.out.println("DONE AND DONE");

I have tried uncommenting each of the commented lines, however in every case the output indicates that the same thread is used to process all of the events

Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done

(The only difference is that without the Schedulers, they are run on the subscribe thread, whereas with any of the executors, they all run in the same thread, which is not the subscribe thread.)

What am I missing?

FYI, there is a "sleep" method:

public static void sleep(long time) {
    try {
        Thread.sleep(time);
    } catch (InterruptedException e) {
        System.out.println("Exiting");
    }
}
like image 855
vinnygray Avatar asked Apr 28 '19 16:04

vinnygray


1 Answers

One way to handle items in parallel, is to use .parallel / .runOn

flux
    .parallel(10)
    .runOn(scheduler)
    //
    // Work to be performed in parallel goes here.  (e.g. .map, .flatMap, etc)
    //
    // Then, if/when you're ready to go back to sequential, call .sequential()
    .sequential()

Blocking operations (such as blocking IO, or Thread.sleep) will block the thread on which they are executed. Reactive streams cannot magically turn a blocking method into a non-blocking method. Therefore, you need to ensure blocking methods are run on a Scheduler suitable for blocking operations (e.g. Schedulers.boundedElastic()).

In the example above, since you know you are calling a blocking operation, you could use .runOn(Schedulers.boundedElastic()).

Depending on the use case, you can also use async operators like .flatMap in combination with .subscribeOn or .publishOn to delegate specific blocking operations to another Scheduler, as described in the project reactor docs. For example:

flux
    .flatMap(i -> Mono.fromCallable(() -> {
            System.out.println("ReactorTests.test " + Thread.currentThread());
            sleep(1000L); // simulate IO delay
            return String.format("String %d", i);
        })
        .subscribeOn(Schedulers.boundedElastic()))

In fact, .flatMap also has an overloaded variant that takes a concurrency parameter where you can limit the maximum number of in-flight inner sequences. This can be used instead of .parallel in some use cases. It will not generally work for Flux.interval though, since Flux.interval doesn't support downstream requests that replenish slower than the ticks.

like image 133
Phil Clay Avatar answered Nov 17 '22 12:11

Phil Clay