Newbie alert to Spring Webflux (v 2.0.1.RELEASE).
I'd like to use Spring Webflux for a back end (Webless) application for processing a large amount of data from a JMS listener.
My understanding is Spring Webflux provides an ]non-blocking/async concurrency model. However, I got a basic question for which I need some help. As a disclaimer, this whole concept of reactive programming is very new to me and I'm still in the process of this paradigm shift.
Consider this code:
Mono.just("ONE")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);
Mono.just("TWO")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);
I understand from the docs that nothing happens to the event processing chain until a "subscribe" function is called upon.
But internally, does the spring use (if it wishes) use separate threads asynchronously for every function inside the "map" function? If spring uses a "single" thread for these chains, then what's the real purpose here? Isn't it a blocking and single threaded model based on a different syntax?
I observe that the code always behaves sequentially and with the same thread. What's the threading model of spring webflux?
Spring WebFlux offers a mechanism to switch processing to a different thread pool in between a data flow chain. This can provide us with precise control over the scheduling strategy that we want for certain tasks.
Use Spring MVC with WebFlux to build Java-based web applications. Employ the various Spring MVC architectures. Work with controllers and routing functions. Build microservices and web services using Spring MVC and REST.
Thread-Per-Request Model. In this model, each request from a client is processed in a different thread of control. This model is useful when a server typically receives requests of long duration from multiple clients.
Spring Reactor introduced a new web client to make web requests called WebClient. Compared to RestTemplate, this client has a more functional feel and is fully reactive. It's included in the spring-boot-starter-weblux dependency and it's build to replace RestTemplate in a non-blocking way.
Reactive programming is a programming paradigm and as such it doesn't make any assumptions about the technical implementation.
The reactive manifesto describes reactive systems and brings asynchronous communication and backpressure on the table. Other than that it also makes no assumptions about technical details.
Spring Reactor, the foundation of Webflux, is a library that allows you to easily build reactive systems and follow the reactive programming paradigm.
The thread that is used by a stream depends on the publisher. The default is to use the current thread. Without any intervention, the stream cannot be asynchronous if a publisher is synchronous. And the stream is blocking if a publisher blocks. But take the following example:
Flux.interval(Duration.ofMillis(100))
.take(2)
.subscribe(i -> System.out.println(Thread.currentThread().getName()));
Flux.interval
publishes on another thread and so the chain runs asynchronously in another thread.
Let's look at another example:
Scheduler scheduler = Schedulers.newElastic("foo");
Flux<Integer> flux = Flux.just(1, 2)
.subscribeOn(scheduler);
flux.subscribe(i -> System.out.println(Thread.currentThread().getName()));
flux.subscribe(i -> System.out.println(Thread.currentThread().getName()));
You will notice that each subscriber runs on its own thread (from the same thread pool though). The publishOn
operator is similar.
If you subscribe to a publisher you can use the same programming paradigm regardless of whether it's synchronous or asynchronous. And you can always introduce asynchronous behavior by adding a subscribeOn
or publishOn
operator.
TL; DR:
No, it isn't a single threaded model with a different syntax. Project Reactor tries as much as possible to use your main thread to avoid context-switches. In addition, it provides special operators which lets you specify the threads that previous operations run on.
For instance, this modified example would run on different threads; as subscribeOn
operator defines which thread pool the whole chain runs on:
Mono.just("ONE")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribeOn(Schedulers.elastic())
.subscribe(item -> {
System.out.println(Thread.currentThread().getName() + " " + item);
});
Mono.just("TWO")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribeOn(Schedulers.elastic())
.subscribe(item -> {
System.out.println(Thread.currentThread().getName() + " " + item);
});
In this case, both operations execute on an elastic-x
thread; not blocking the main thread. The order of the operations might vary with each run.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With