Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Reactor Thread Model

Newbie alert to Spring Webflux (v 2.0.1.RELEASE).

I'd like to use Spring Webflux for a back end (Webless) application for processing a large amount of data from a JMS listener.

My understanding is Spring Webflux provides an ]non-blocking/async concurrency model. However, I got a basic question for which I need some help. As a disclaimer, this whole concept of reactive programming is very new to me and I'm still in the process of this paradigm shift.

Consider this code:

Mono.just("ONE")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);

Mono.just("TWO")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);

I understand from the docs that nothing happens to the event processing chain until a "subscribe" function is called upon.

But internally, does the spring use (if it wishes) use separate threads asynchronously for every function inside the "map" function? If spring uses a "single" thread for these chains, then what's the real purpose here? Isn't it a blocking and single threaded model based on a different syntax?

I observe that the code always behaves sequentially and with the same thread. What's the threading model of spring webflux?

like image 608
user1189332 Avatar asked Apr 18 '18 21:04

user1189332


People also ask

Is WebFlux multithreaded?

Spring WebFlux offers a mechanism to switch processing to a different thread pool in between a data flow chain. This can provide us with precise control over the scheduling strategy that we want for certain tasks.

Can I use Springmvc and WebFlux together?

Use Spring MVC with WebFlux to build Java-based web applications. Employ the various Spring MVC architectures. Work with controllers and routing functions. Build microservices and web services using Spring MVC and REST.

What is thread per request model?

Thread-Per-Request Model. In this model, each request from a client is processed in a different thread of control. This model is useful when a server typically receives requests of long duration from multiple clients.

How does a spring reactor work?

Spring Reactor introduced a new web client to make web requests called WebClient. Compared to RestTemplate, this client has a more functional feel and is fully reactive. It's included in the spring-boot-starter-weblux dependency and it's build to replace RestTemplate in a non-blocking way.


2 Answers

Reactive programming is a programming paradigm and as such it doesn't make any assumptions about the technical implementation.

The reactive manifesto describes reactive systems and brings asynchronous communication and backpressure on the table. Other than that it also makes no assumptions about technical details.

Spring Reactor, the foundation of Webflux, is a library that allows you to easily build reactive systems and follow the reactive programming paradigm.

The thread that is used by a stream depends on the publisher. The default is to use the current thread. Without any intervention, the stream cannot be asynchronous if a publisher is synchronous. And the stream is blocking if a publisher blocks. But take the following example:

Flux.interval(Duration.ofMillis(100))
    .take(2)
    .subscribe(i -> System.out.println(Thread.currentThread().getName()));

Flux.interval publishes on another thread and so the chain runs asynchronously in another thread.

Let's look at another example:

Scheduler scheduler = Schedulers.newElastic("foo");

Flux<Integer> flux = Flux.just(1, 2)
    .subscribeOn(scheduler);

flux.subscribe(i -> System.out.println(Thread.currentThread().getName()));
flux.subscribe(i -> System.out.println(Thread.currentThread().getName()));

You will notice that each subscriber runs on its own thread (from the same thread pool though). The publishOn operator is similar.

If you subscribe to a publisher you can use the same programming paradigm regardless of whether it's synchronous or asynchronous. And you can always introduce asynchronous behavior by adding a subscribeOn or publishOn operator.

like image 68
a better oliver Avatar answered Oct 12 '22 07:10

a better oliver


TL; DR:

  1. It's a Project Reactor thing, Spring-Webflux doesn't decide what operation runs on which thread.
  2. Project Reactor makes it easier to tell where you're crossing the thread boundaries. Also, there's no (explicit) synchronization going on; making it harder to introduce concurrency problems.

No, it isn't a single threaded model with a different syntax. Project Reactor tries as much as possible to use your main thread to avoid context-switches. In addition, it provides special operators which lets you specify the threads that previous operations run on.

For instance, this modified example would run on different threads; as subscribeOn operator defines which thread pool the whole chain runs on:

Mono.just("ONE")
    .map(item -> func(" A " + item))
    .map(item -> func(" B " + item))
    .map(item -> func(" C " + item))
    .subscribeOn(Schedulers.elastic())
    .subscribe(item -> {
        System.out.println(Thread.currentThread().getName() + " " + item);
    });

Mono.just("TWO")
    .map(item -> func(" A " + item))
    .map(item -> func(" B " + item))
    .map(item -> func(" C " + item))
    .subscribeOn(Schedulers.elastic())
    .subscribe(item -> {
        System.out.println(Thread.currentThread().getName() + " " + item);
    });

In this case, both operations execute on an elastic-x thread; not blocking the main thread. The order of the operations might vary with each run.

like image 32
MuratOzkan Avatar answered Oct 12 '22 07:10

MuratOzkan