Spring 5 introduces the reactive programming style for rest APIs with webflux. I'm fairly new to it myself and was wondering wether wrapping synchronous calls to a database into Flux
or Mono
makes sense preformence-wise? If yes, is this the way to do it:
@RestController public class HomeController { private MeasurementRepository repository; public HomeController(MeasurementRepository repository){ this.repository = repository; } @GetMapping(value = "/v1/measurements") public Flux<Measurement> getMeasurements() { return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))); } }
Is there something like an asynchronous CrudRepository? I couldn't find it.
What is Spring WebFlux ? Spring WebFlux is parallel version of Spring MVC and supports fully non-blocking reactive streams. It support the back pressure concept and uses Netty as inbuilt server to run reactive applications. If you are familiar with Spring MVC programming style, you can easily work on webflux also.
Regarding scalability, Spring Async gives us better results than synchronous Spring MVC implementation. Spring WebFlux, because of its reactive nature, provides us elasticity and higher availability.
Use a simple domain model – Employee with an id and a name field. Build a REST API with a RestController to publish Employee resources as a single resource and as a collection. Build a client with WebClient to retrieve the same resource. Create a secured reactive endpoint using WebFlux and Spring Security.
There are several reasons for this: Spring MVC can't run on Netty. both infrastructure will compete for the same job (for example, serving static resources, the mappings, etc) mixing both runtime models within the same container is not a good idea and is likely to perform badly or just not work at all.
One option would be to use alternative SQL clients that are fully non-blocking. Some examples include: https://github.com/mauricio/postgresql-async or https://github.com/finagle/roc. Of course, none of these drivers is officially supported by database vendors yet. Also, functionality is way much less attractive comparing to mature JDBC-based abstractions such as Hibernate or jOOQ.
The alternative idea came to me from Scala world. The idea is to dispatch blocking calls into isolated ThreadPool not to mix blocking and non-blocking calls together. This will allow us to control the overall number of threads and will let the CPU serve non-blocking tasks in the main execution context with some potential optimizations. Assuming that we have JDBC based implementation such as Spring Data JPA which is indeed blocking, we can make it’s execution asynchronous and dispatch on the dedicated thread pool.
@RestController public class HomeController { private final MeasurementRepository repository; private final Scheduler scheduler; public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) { this.repository = repository; this.scheduler = scheduler; } @GetMapping(value = "/v1/measurements") public Flux<Measurement> getMeasurements() { return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler); } }
Our Scheduler for JDBC should be configured by using dedicated Thread Pool with size count equal to the number of connections.
@Configuration public class SchedulerConfiguration { private final Integer connectionPoolSize; public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) { this.connectionPoolSize = connectionPoolSize; } @Bean public Scheduler jdbcScheduler() { return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize)); } }
However, there are difficulties with this approach. The main one is transaction management. In JDBC, transactions are possible only within a single java.sql.Connection. To make several operations in one transaction, they have to share a connection. If we want to make some calculations in between them, we have to keep the connection. This is not very effective, as we keep a limited number of connections idle while doing calculations in between.
This idea of an asynchronous JDBC wrapper is not new and is already implemented in Scala library Slick 3. Finally, non-blocking JDBC may come along on the Java roadmap. As it was announced at JavaOne in September 2016, and it is possible that we will see it in Java 10.
Based on this blog you should rewrite your snippet in following way
@GetMapping(value = "/v1/measurements") public Flux<Measurement> getMeasurements() { return Flux.defer(() -> Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L)))) .subscribeOn(Schedulers.elastic()); }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With