I am using Spring Webflux with Spring data jpa using PostgreSql as backend db.
I don't want to block the main thread while making db calls like find
and save
.
To achieve the same, I have a main scheduler in Controller
class and a jdbcScheduler
service classes.
The way I have defined them is:
@Configuration
@EnableJpaAuditing
public class CommonConfig {
@Value("${spring.datasource.hikari.maximum-pool-size}")
int connectionPoolSize;
@Bean
public Scheduler scheduler() {
return Schedulers.parallel();
}
@Bean
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
}
@Bean
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
return new TransactionTemplate(transactionManager);
}
}
Now, while doing a get/save call in my service layer I do:
@Override
public Mono<Config> getConfigByKey(String key) {
return Mono.defer(
() -> Mono.justOrEmpty(configRepository.findByKey(key)))
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
}
@Override
public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
return Flux
.fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
}
@Override
public Flux<Config> addConfigs(List<Config> configList) {
return Flux.fromIterable(configRepository.saveAll(configList))
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
}
And in controller, I do:
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
return configService.addConfigs(configs).collectList()
.map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
.subscribeOn(scheduler);
}
Is this correct? and/or there is a way better way to do it?
What I understand by:
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
is that task will run on jdbcScheduler
threads and later result will be published on my main parallel scheduler
. Is this understanding correct?
Your understanding is correct with regards to publishOn
and subscribeOn
(see reference documentation in the reactor project about those operators).
If you call blocking libraries without scheduling that work on a specific scheduler, those calls will block one of the few threads available (by default, the Netty event loop) and your application will only be able to serve a few requests concurrently.
Now I'm not sure what you're trying to achieve by doing that.
First, the parallel scheduler is designed for CPU bound tasks, meaning you'll have few of them, as many (or a bit more) as CPU cores. In this case, it's like setting your threadpool size to the number of cores on a regular Servlet container. Your app won't be able to process a large number of concurrent requests.
Even if you choose a better alternative (like the elastic Scheduler), it will be still not as good as the Netty event loop, which is where request processing is scheduled natively in Spring WebFlux.
If your ultimate goal is performance and scalability, wrapping blocking calls in a reactive app is likely to perform worse than your regular Servlet container.
You could instead use Spring MVC and:
Mono
and Flux
return types when you're not tied to such librariesThis won't be non-blocking, but this will be asynchronous still and you'll be able to do more work in parallel without dealing with the complexity.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With