I have a spring-webflux API which, at a service layer, needs to read from an existing repository which uses JDBC.
Having done some reading on the subject, I would like to keep the execution of the blocking database call separate from the rest of my non-blocking async code.
I have defined a dedicated jdbcScheduler:
@Bean
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(maxPoolSize));
}
And an AsyncWrapper utility to use it:
@Component
public class AsyncJdbcWrapper {
private final Scheduler jdbcScheduler;
@Autowired
public AsyncJdbcWrapper(Scheduler jdbcScheduler) {
this.jdbcScheduler = jdbcScheduler;
}
public <T> Mono<T> async(Callable<T> callable) {
return Mono.fromCallable(callable)
.subscribeOn(jdbcScheduler)
.publishOn(Schedulers.parallel());
}
}
Which is then used to wrap jdbc calls like so:
Mono<Integer> userIdMono = asyncWrapper.async(() -> userDao.getUserByUUID(request.getUserId()))
.map(userOption -> userOption.map(u -> u.getId())
.orElseThrow(() -> new IllegalArgumentException("Unable to find user with ID " + request.getUserId())));
I've got two questions:
1) Am I correctly pushing the execution of blocking calls to another set of threads? Being fairly new to this stuff I'm struggling with the intricacies of subscribeOn()/publishOn().
2) Say I want to make use of the resulting mono, e.g call an API with the result of the userIdMono, on which scheduler will that be executed? The one specifically created for the jdbc calls, or the main(?) thread that reactor usually operates within? e.g.
userIdMono.map(id -> someApiClient.call(id));
As you know, Mono is an asynchronous call that executes in a non-blocking way.
Once a new stream – represented by a Publisher instance – is ready, flatMap eagerly subscribes. The operator doesn't wait for the publisher to finish before moving on to the next stream, meaning the subscription is non-blocking.
A Flux object represents a reactive sequence of 0.. N items, whereas a Mono object represents a single value or an empty (0..1) result. Most times, you expect exactly one result or no (zero) result, and not a collection that contains possibly multiple results. In such scenarios, it's more convenient to have a Mono.
Project Reactor uses the 4 basic blocks of the publish/subscribe pattern: Publisher: The component that has a piece of data to publish to any subscribers. Subscriber: The component that subscribes for some kind of data from one or more publishers.
1) Use of subscribeOn
is correctly putting the JDBC work on the jdbcScheduler
2) Neither, the results of the Callable
- while computed on the jdbcScheduler, are publishOn
the parallel
Scheduler, so your map
will be executed on a thread from the Schedulers.parallel()
pool (rather than hogging the jdbcScheduler
).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With