Just beginning to explore the reactor project and it's abstractions Mono and Flux and would like to understand the basic differences with the java 8 barebones CompletableFuture.
Here is a simple code I have:
public static void main(String[] args) throws Exception {
Mono.fromCallable(() -> getData())
.map(s -> s + " World ")
.subscribe(s -> System.out.println(s));
CompletableFuture.supplyAsync(() -> getData())
.thenAccept(System.out::println);
System.out.println(Thread.currentThread()+" End ");
}
private static String getData() {
int j=0;
for(int i=0; i<Integer.MAX_VALUE; i++){
j = j - i%2;
}
System.out.println(Thread.currentThread()+" - "+j);
return " Hello ";
}
Firstly, no surprises with the CompletableFuture
. supplyAsync
schedules the function for execution via the ForkJoinPool and the "End" line prints immediately and the program terminates as the main thread is really short-lived here - As expected.
But the Mono.fromCallable(...)
blocks the main thread there. Also, the thread name that gets printed in the getData()
function is the main thread. So I see a sequential/blocking behavior rather than sequential/nonblocking(async) behavior. Is it because I had applied a subscribe function on the same thread, it is blocking? Can someone explain this, please?
A Reactive Streams Publisher with basic rx operators that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono). Most Mono implementations are expected to immediately call Subscriber.
Ans. A CompletableFuture represents one result of an asynchronous call, while Reactive Streams is a pattern for pushing N messages synchronously/asynchronously through a system. CompletableFuture doesn't address the elasticity requirement of the Reactive Manifesto as it does not handle backpressure.
A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.
As you know, Mono is an asynchronous call that executes in a non-blocking way.
Is it because I had applied a subscribe function on the same thread, it is blocking?
This is exactly what seems to happen.
This specific behavior surprises me a little since it is not the way most pipelines behave. Most pipelines have one way or another some operation in there which make the pipeline async. publishOn
, subscribeOn
are the obvious examples but also a flatMap
might have such an effect and probably many others. In those cases, subscribe will return immediately.
This hints at a very important point about reactive programming though: Pipelines should not contain long blocking calls. A reactive pipeline is intended to be prepared and when subscribed on to process events without blocking. Blocking statements therefore have the very real potential of blocking the whole execution. With the use of a Scheduler
you can confine such calls to special ThreadPools and thereby control their effect.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With