Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding Spring's Web Reactive Framework

I am currently developing an application with SpringBoot 2, spring-boot-starter-webflux on netty and jOOQ.
Below is the code that I have come up with after hours of research and stackoverflow searches. I have built in a lot of logging in order to see what's happening on which thread.

UserController:

@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
    return Mono.just(user)
            .map(it -> {
                logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
                return it;
            })
            .map(userService::create)
            .map(it -> {
                logger.debug("Sending response on thread: " + Thread.currentThread().getName());
                return ResponseEntity.status(HttpStatus.CREATED).body(it);
            })
            .mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}

UserService:

public int create(ImUser user) {
    return Mono.just(user)
            .subscribeOn(Schedulers.elastic())
            .map(u -> {
                logger.debug("UserService thread: " + Thread.currentThread().getName());
                return imUserDao.insertUser(u);
            })
            .block();
}

UserDao:

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public int insertUser(ImUser user) {
    logger.debug("Insert DB on thread: " + Thread.currentThread().getName());
    return dsl.insertInto(IM_USER,IM_USER.VERSION, IM_USER.FIRST_NAME, IM_USER.LAST_NAME, IM_USER.BIRTHDATE, IM_USER.GENDER)
            .values(1, user.getFirstName(), user.getLastName(), user.getBirthdate(), user.getGender())
            .returning(IM_USER.ID)
            .fetchOne()
            .getId();
}

The code works as expected, "Receiving request" and "Sending response" both run on the same thread (reactor-http-server-epoll-x) while the blocking code ( the call to imUserDao.insertUser(u) ) runs on an elastic Scheduler thread (elastic-x). The transaction is bound to the thread on which the annotated method is called (which is elastic-x) and thus works as expected (I have tested it with a different method which is not posted here, to keep things simple).

Here is a log sample:

20:57:21,384 DEBUG         admin.UserController| Receiving request on thread: reactor-http-server-epoll-7
20:57:21,387 DEBUG            admin.UserService| UserService thread: elastic-2
20:57:21,391 DEBUG        admin.ExtendedUserDao| Insert DB on thread: elastic-2
20:57:21,393 DEBUG         tools.LoggerListener| Executing query          
...
20:57:21,401 DEBUG              tools.StopWatch| Finishing                : Total: 9.355ms, +3.355ms
20:57:21,409 DEBUG         admin.UserController| Sending response on thread: reactor-http-server-epoll-7

I have researched reactive programming for a long time now, but never quite got to program anything reactive. Now that I am, I am wondering if I am doing it correctly. So here are my questions:

1. Is the code above a good way to handle incoming HTTP requests, query the DB and then respond? Please ignore the logger.debug(...) calls which I have built in for the sake of my sanity :) I kind of expected to have a Flux< ImUser> as the argument to the controller method, in the sense that I have a stream of multiple potential requests that will come at some point and will all be handled in the same way. Instead, the examples that I have found create a Mono.from(...); every time a request comes in.

2. The second Mono created in the UserService ( Mono.just(user) ) feels somewhat awkward. I understand that I need to start a new stream to be able to run code on the elastic Scheduler, but isn't there an operator that does this?

3. From the way the code is written, I understand that the Mono inside the UserService will be blocked until the DB operation finishes, but the original stream, which serves the requests, isn't blocked. Is this correct?

4. I plan to replace Schedulers.elastic() with a parallel Scheduler where I can configure the number of worker threads. The idea is that the number of maximum worker threads should be the same as maximum DB connections. What will happen when all worker threads inside the Scheduler will be busy? Is that when backpressure jumps in?

5. I initially expected to have this code inside my controller:

return userService.create(user)
            .map(it -> ResponseEntity.status(HttpStatus.CREATED).body(it))
            .mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));

but I have not been able to achieve that AND keep the things running in the correct threads. Is there any way to achieve this inside my code?

Any help would be greatly appreciated. Thanks!

like image 312
Liviu Ilea Avatar asked Mar 30 '17 23:03

Liviu Ilea


People also ask

What is reactive web framework?

In plain terms reactive programming is about non-blocking applications that are asynchronous and event-driven and require a small number of threads to scale. A key aspect of that definition is the concept of backpressure which is a mechanism to ensure producers don't overwhelm consumers.

What is reactive WebClient?

WebClient is a non-blocking, reactive client for performing HTTP requests with Reactive Streams back pressure. WebClient provides a functional API that takes advantage of Java 8 Lambdas. By default, WebClient uses Reactor Netty as the HTTP client library. But others can be plugged in through a custom.

What is difference between Spring web and Spring reactive web?

"Spring Web" is based on a conventional blocking style with Servlet, and "Spring Reactive Web" is a new style with reactive programming.

How do you explain Spring Framework?

The Spring Framework (Spring) is an open-source application framework that provides infrastructure support for developing Java applications. One of the most popular Java Enterprise Edition (Java EE) frameworks, Spring helps developers create high performing applications using plain old Java objects (POJOs).


1 Answers

Service and Controller
The fact that your service is blocking is problematic, because then in the controller you are calling a blocking method from inside a map that isn't moved on a separate thread. This has the potential to block all controllers.

What you could do instead is return a Mono from UserService#create (remove the block() at the end). Since the service ensures that the Dao method call is isolated, it is less problematic. From there, no need to do Mono.just(user) in the Controller: just call create and start chaining operators directly on the resulting Mono:

@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
    //this log as you saw was executed in the same thread as the controller method
    logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
    return userService.create(user)
        .map(it -> {
            logger.debug("Sending response on thread: " + Thread.currentThread().getName());
            return ResponseEntity.status(HttpStatus.CREATED).body(it);
        })
        .mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}

Logging
Note that if you want to log something there are a couple better options than doing a map and returning it:

  • doOnNext method is tailored for that: react to one of the reactive signals (in this instance, onNext: a value is emitted) and perform some non-mutating action, leaving the output sequence exactly the same as the source sequence. The "side-effect" of the doOn can be writing to the console or incrementing statistic counters for instance... There's also doOnComplete, doOnError, doOnSubscribe, doOnCancel, etc...

  • log simply logs all events in the sequence above it. It will detect if you use SLF4J and use the configured logger at DEBUG level if so. Otherwise it'll use the JDK Logging features (so you also need to configure that to display DEBUG level logs).

A word about transactions or rather anything relying on ThreadLocal
ThreadLocal and thread-stickiness can be problematic in reactive programming, because there's less guarantee of the underlying execution model staying the same throughout a whole sequence. A Flux can execute in several steps, each in a different Scheduler (and so thread or thread pool). Even at a specific step, one value could be processed by thread A of the underlying thread pool while the next one, arriving later on, would be processed on thread B.

Relying on Thread Local is less straightforward in this context, and we are currently actively working on providing alternatives that fit better in the reactive world.

Your idea of making a pool of the size of the connection pool is good, but not necessarily sufficient, with the potential of several threads being used by a transactional flux, thus maybe polluting some threads with the transaction.

What happens when a pool runs out of threads
If you are using a particular Scheduler to isolate blocking behavior like here, once it runs out of threads it would throw a RejectedExecutionException.

like image 167
Simon Baslé Avatar answered Oct 22 '22 05:10

Simon Baslé