Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flux/Publisher on main thread

I'm new to reactive programming/project-reactor, trying to understand the concepts. Created a Flux with range method and subscribed. When I look at the log, everything is running on main thread.

     Flux
        .range(1, 5)
        .log()
        .subscribe(System.out::println);

    System.out.println("End of Execution");

[DEBUG] (main) Using Console logging [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(1) 1 [ INFO] (main) | onNext(2) 2 [ INFO] (main) | onNext(3) 3 [ INFO] (main) | onNext(4) 4 [ INFO] (main) | onNext(5) 5 [ INFO] (main) | onComplete() End of Execution

Once Publisher is done with the emission of all elements, then only the rest of the code got executed(System.out.println("End of Execution"); in the above example). Publisher will block the thread by default? If I change the scheduler, seems it's not blocking the thread.

Flux
        .range(1, 5)
        .log()
        .subscribeOn(Schedulers.elastic())
        .subscribe(System.out::println);
    System.out.println("End of Execution");
    Thread.sleep(10000);

[DEBUG] (main) Using Console logging End of Execution [ INFO] (elastic-2) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (elastic-2) | request(unbounded) [ INFO] (elastic-2) | onNext(1) 1 [ INFO] (elastic-2) | onNext(2) 2 [ INFO] (elastic-2) | onNext(3) 3 [ INFO] (elastic-2) | onNext(4) 4 [ INFO] (elastic-2) | onNext(5) 5 [ INFO] (elastic-2) | onComplete()

like image 783
Eswar Karumuri Avatar asked Oct 15 '25 18:10

Eswar Karumuri


1 Answers

Reactor does not enforce a concurrency model by default and yes, many operators will continue the work on the Thread where the subscribe() operation happened.

But this doesn't mean that using Reactor will block the main thread. The sample you're showing is doing in-memory work, no I/O or latency involved. Also, it's subscribing right away on the result.

You can try the following snippet and see something different:

Flux.range(1, 5)
    .delayElements(Duration.ofMillis(100))
    .log()
    .subscribe(System.out::println);
System.out.println("End of Execution");

In the logs, I'm seeing:

INFO   --- [main] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
INFO   --- [main] reactor.Flux.ConcatMap.1 : request(unbounded)
End of Execution

In this case, delaying elements will schedule work in a different way - and since nothing here is keeping the JVM alive, the application exits and no element from the range is consumed.

In a more common scenario, I/O and latency will be involved and that work will be scheduled in appropriate ways and will not block the main application thread.

like image 113
Brian Clozel Avatar answered Oct 19 '25 14:10

Brian Clozel



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!