Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Wrapping blocking I/O in project reactor

I have a spring-webflux API which, at a service layer, needs to read from an existing repository which uses JDBC.

Having done some reading on the subject, I would like to keep the execution of the blocking database call separate from the rest of my non-blocking async code.

I have defined a dedicated jdbcScheduler:

@Bean
public Scheduler jdbcScheduler() {
    return Schedulers.fromExecutor(Executors.newFixedThreadPool(maxPoolSize));
}

And an AsyncWrapper utility to use it:

@Component
public class AsyncJdbcWrapper {

    private final Scheduler jdbcScheduler;

    @Autowired
    public AsyncJdbcWrapper(Scheduler jdbcScheduler) {

        this.jdbcScheduler = jdbcScheduler;
    }

    public <T> Mono<T> async(Callable<T> callable) {
        return Mono.fromCallable(callable)
                .subscribeOn(jdbcScheduler)
                .publishOn(Schedulers.parallel());
    }
}

Which is then used to wrap jdbc calls like so:

Mono<Integer> userIdMono = asyncWrapper.async(() -> userDao.getUserByUUID(request.getUserId()))
                .map(userOption -> userOption.map(u -> u.getId())
                        .orElseThrow(() -> new IllegalArgumentException("Unable to find user with ID " + request.getUserId())));

I've got two questions:

1) Am I correctly pushing the execution of blocking calls to another set of threads? Being fairly new to this stuff I'm struggling with the intricacies of subscribeOn()/publishOn().

2) Say I want to make use of the resulting mono, e.g call an API with the result of the userIdMono, on which scheduler will that be executed? The one specifically created for the jdbc calls, or the main(?) thread that reactor usually operates within? e.g.

userIdMono.map(id -> someApiClient.call(id));
like image 426
f1dave Avatar asked Nov 27 '17 07:11

f1dave


People also ask

Is Mono subscribe blocking?

As you know, Mono is an asynchronous call that executes in a non-blocking way.

Is flatMap blocked?

Once a new stream – represented by a Publisher instance – is ready, flatMap eagerly subscribes. The operator doesn't wait for the publisher to finish before moving on to the next stream, meaning the subscription is non-blocking.

What is difference between mono and flux?

A Flux object represents a reactive sequence of 0.. N items, whereas a Mono object represents a single value or an empty (0..1) result. Most times, you expect exactly one result or no (zero) result, and not a collection that contains possibly multiple results. In such scenarios, it's more convenient to have a Mono.

What is publisher in reactor?

Project Reactor uses the 4 basic blocks of the publish/subscribe pattern: Publisher: The component that has a piece of data to publish to any subscribers. Subscriber: The component that subscribes for some kind of data from one or more publishers.


1 Answers

1) Use of subscribeOn is correctly putting the JDBC work on the jdbcScheduler

2) Neither, the results of the Callable - while computed on the jdbcScheduler, are publishOn the parallel Scheduler, so your map will be executed on a thread from the Schedulers.parallel() pool (rather than hogging the jdbcScheduler).

like image 124
Simon Baslé Avatar answered Nov 16 '22 01:11

Simon Baslé