Project Reactor provides a great way to define which thread pool for code to run on, by defining a Scheduler
. It also provides a bridge to libraries that use CompletableFuture
's though Mono.fromFuture(..)
.
AWS's async client for DyanmoDB, executes the CompletableFuture
's it returns from the API calls on a java.util.concurrent.Executor
. By default, it creates an Executor
backed by a thread pool it also creates. The result is that even streams with a defined Scheduler
like Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic())
execute on a thread from the pool the library creates instead of one from the Schedulers.boundedElastic()
. So we're seeing thread names like sdk-async-response-0-2
, instead of names like boundedElastic-1
.
Fortunately, the library allows us to provide our own Executor
as shown here, so my question is:
How do you build an
Executor
that uses a thread from theScheduler
defined on that part of the stream at runtime?
Use Case
We have a repository class, that has a findById
method and we need the caller to be able to control which Scheduler
to run on, because it's used in these distinctly different contexts:
Schedulers.boundedElastic()
scheduler.Attempts
We've tried defining an Executor
using both Schedulers.immediate()
and Runnable::run
as shown here, but both result in executing on the Netty event loop thread (example name: aws-java-sdk-NettyEventLoop-0-2
), not a thread from the defined Scheduler
.
DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
runnable -> Schedulers.immediate().schedule(runnable)
))
.build();
DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
Runnable::run
))
.build();
Looking into the question, I see the need to observe elements after execution on a particular thread.
To be precise, observe in this context means *being able to work on a value in the stream on some specific thread. In RxJava, we have a proper operator called precisely like that, but in Project Reactor, we call identical operation as publishOn
.
Thus,
* if you want to process data * on Schedulers.boundedElastic()
then you should use the following construction
Mono.fromFuture(..)
.publishOn(Schedulers.boundedElastic())
.subscribeOn
worked as well???Reading the previous construction, you may start worrying because you are 100% sure that
Mono.fromRunnable(..)
.subscribeOn(Schedulers.boundedElastic())
Sends onNext
on the thread boundedElastic-1
, so what is wrong with the same fromFuture
.
and here comes a trick:
subscribeOn
with Futures
/ CompletableFuture
or anything which can use own async mechanism underneathIf we look at what is going on behind subscribeOn
, you will find out something like the following:
// Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Scheduler scheduler;
Publisher<T> parent;
scheduler.schedule(() -> parent.subscribe(actual));
}
Which basically means parent's subscribe
method will be called on a separate thread.
Such a technique works for fromRunnable
, fromSupplier
, fromCallable
because their logic happens in the subscribe
method:
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Operators.MonoSubscriber<T, T>
sds = new Operators.MonoSubscriber<>(actual);
actual.onSubscribe(sds);
// skiped some parts
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
}
which means it is almost equal to
scheduler.schedule(() -> {
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
})
In contrast, fromFuture
works much trickier.
A short quiz.
On which thread we may observe a value? (assume execution happens on thread Main, and the task is executing on ForkJoinPool)
var future = CompletableFuture
.supplyAsync(() -> {
return value;
})
... // some code here, does not metter just code
future.thenAccept(value -> {
System.out.println(Thread.currentThread())
});
And the correct answer.... 🥁🥁🥁🥁🥁🥁
It may be Thread Main
or it may be Thread from ForkJoinPool
...
because it is racy... and at the point, we consume value, the value may be already delivered, so we just read volatile
field on the reader thread (thread Main), otherwise, thread Main just going to set an acceptor
so the acceptor will be invoked later on the ForkJoinPool
thread.
Right, that is why when you use fromFuture
with subscribeOn
, there is no guarantee that the subscribeOn
thread will observe the value of the given CompletableFuture
.
That is why publishOn
is the only way to ensure value processing is happening on the desired Thread.
publishOn
all the way down???And yes and no. It depends.
If you use Mono
- in 99% of the cases, you may use publishOn
if you want to make sure that your data processing is happening on a particular thread - always use publishOn
.
Do not worry about a potential overhead, Project Reactor takes care of you even if you used it accidentally. Project Reactor has several optimization which may replace your publishOn
with subscribeOn
(if it is safe without breaking the behavior) at runtime so you will get the best.
Scheduelr
sSchedulers.immediate()
it is almost no-ops scheduler which basically does
Schedulers.immediate().scheduler(runnable) {
runnable.run()
}
Right, it does nothing useful for reactor users, and we use it only for internal needs.
There are two options:
1.a) Create your bounded Executor
. (e.g. Executors.fixed...
)
1.b) Create your bounded ScheduledExecutorService
if you want to get the power of periodic task and delayed tasks
2) Create a Scheduler
from your executor using Schedulers.fromExecutorXXX
API
3) Use your bounded Executor
in the imperative world, use your Scheduler
which is a wrapper around the bounded one for the reactive world
Coming soon...
Coming soon
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With