Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Project Reactor's Scheduler with Executor based libraries?

Project Reactor provides a great way to define which thread pool for code to run on, by defining a Scheduler. It also provides a bridge to libraries that use CompletableFuture's though Mono.fromFuture(..).

AWS's async client for DyanmoDB, executes the CompletableFuture's it returns from the API calls on a java.util.concurrent.Executor. By default, it creates an Executor backed by a thread pool it also creates. The result is that even streams with a defined Scheduler like Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic()) execute on a thread from the pool the library creates instead of one from the Schedulers.boundedElastic(). So we're seeing thread names like sdk-async-response-0-2, instead of names like boundedElastic-1.

Fortunately, the library allows us to provide our own Executor as shown here, so my question is:

How do you build an Executor that uses a thread from the Scheduler defined on that part of the stream at runtime?

Use Case

We have a repository class, that has a findById method and we need the caller to be able to control which Scheduler to run on, because it's used in these distinctly different contexts:

  1. API responses that are run on the Schedulers.boundedElastic() scheduler.
  2. Handling Kafka messages that are executed in order, on a thread per partition, from a defined scheduler as shown in the Reactor Kafka docs.

Attempts

We've tried defining an Executor using both Schedulers.immediate() and Runnable::run as shown here, but both result in executing on the Netty event loop thread (example name: aws-java-sdk-NettyEventLoop-0-2), not a thread from the defined Scheduler.

DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        runnable -> Schedulers.immediate().schedule(runnable)
    ))
    .build();
DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        Runnable::run
    ))
    .build();
like image 885
peterl Avatar asked Mar 21 '20 00:03

peterl


Video Answer


1 Answers

Part 1. Observe vs. Subscribe

Looking into the question, I see the need to observe elements after execution on a particular thread. To be precise, observe in this context means *being able to work on a value in the stream on some specific thread. In RxJava, we have a proper operator called precisely like that, but in Project Reactor, we call identical operation as publishOn.

Thus, * if you want to process data * on Schedulers.boundedElastic() then you should use the following construction

Mono.fromFuture(..)
    .publishOn(Schedulers.boundedElastic())

BUT Wait, .subscribeOn worked as well???

Reading the previous construction, you may start worrying because you are 100% sure that

Mono.fromRunnable(..)
    .subscribeOn(Schedulers.boundedElastic())

Sends onNext on the thread boundedElastic-1, so what is wrong with the same fromFuture.

and here comes a trick:

Never use subscribeOn with Futures / CompletableFuture or anything which can use own async mechanism underneath

If we look at what is going on behind subscribeOn, you will find out something like the following:

//  Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
    Scheduler scheduler;
    Publisher<T> parent;
    scheduler.schedule(() -> parent.subscribe(actual));
}

Which basically means parent's subscribe method will be called on a separate thread.

Such a technique works for fromRunnable, fromSupplier, fromCallable because their logic happens in the subscribe method:

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
    Operators.MonoSubscriber<T, T>
    sds = new Operators.MonoSubscriber<>(actual);

    actual.onSubscribe(sds);
    // skiped some parts 
    T t = supplier.get();
    if (t == null) {
        sds.onComplete();
    }
    else {
        sds.complete(t);
    }
}

which means it is almost equal to

scheduler.schedule(() -> {
    T t = supplier.get();
    if (t == null) {
        sds.onComplete();
    }
    else {
        sds.complete(t);
    }
})

In contrast, fromFuture works much trickier. A short quiz.

On which thread we may observe a value? (assume execution happens on thread Main, and the task is executing on ForkJoinPool)

var future = CompletableFuture
.supplyAsync(() -> {
  return value;
})
... // some code here, does not metter just code

future.thenAccept(value -> {
  System.out.println(Thread.currentThread())
});

And the correct answer.... 🥁🥁🥁🥁🥁🥁

It may be Thread Main
or it may be Thread from ForkJoinPool
...
because it is racy... and at the point, we consume value, the value may be already delivered, so we just read volatile field on the reader thread (thread Main), otherwise, thread Main just going to set an acceptor so the acceptor will be invoked later on the ForkJoinPool thread.

Right, that is why when you use fromFuture with subscribeOn, there is no guarantee that the subscribeOn thread will observe the value of the given CompletableFuture.

That is why publishOn is the only way to ensure value processing is happening on the desired Thread.

Alright, should I use publishOn all the way down???

And yes and no. It depends.

If you use Mono - in 99% of the cases, you may use publishOn if you want to make sure that your data processing is happening on a particular thread - always use publishOn.

Do not worry about a potential overhead, Project Reactor takes care of you even if you used it accidentally. Project Reactor has several optimization which may replace your publishOn with subscribeOn (if it is safe without breaking the behavior) at runtime so you will get the best.

Part 2. Falling down the rabbit hole of Scheduelrs

Never ever use Schedulers.immediate()

it is almost no-ops scheduler which basically does

Schedulers.immediate().scheduler(runnable) {
   runnable.run()
}

Right, it does nothing useful for reactor users, and we use it only for internal needs.

Alright, so how then I can use Scheduler to use it in an imperative world as executor

There are two options:

Fast path: Step by Step guide

1.a) Create your bounded Executor. (e.g. Executors.fixed...)
1.b) Create your bounded ScheduledExecutorService if you want to get the power of periodic task and delayed tasks
2) Create a Scheduler from your executor using Schedulers.fromExecutorXXX API
3) Use your bounded Executor in the imperative world, use your Scheduler which is a wrapper around the bounded one for the reactive world

Long path

Coming soon...

Part 3. How to serialize executions.

Coming soon

like image 163
Oleh Dokuka Avatar answered Sep 17 '22 12:09

Oleh Dokuka