Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactor Mono vs CompletableFuture

Just beginning to explore the reactor project and it's abstractions Mono and Flux and would like to understand the basic differences with the java 8 barebones CompletableFuture.

Here is a simple code I have:

public static void main(String[] args) throws Exception {

    Mono.fromCallable(() -> getData())
            .map(s -> s + " World ")
            .subscribe(s -> System.out.println(s));

    CompletableFuture.supplyAsync(() -> getData())
            .thenAccept(System.out::println);

    System.out.println(Thread.currentThread()+" End ");
}

private static String getData() {

    int j=0;

    for(int i=0; i<Integer.MAX_VALUE; i++){
        j = j - i%2;
    }

    System.out.println(Thread.currentThread()+" - "+j);
    return " Hello ";
}

Firstly, no surprises with the CompletableFuture. supplyAsync schedules the function for execution via the ForkJoinPool and the "End" line prints immediately and the program terminates as the main thread is really short-lived here - As expected.

But the Mono.fromCallable(...) blocks the main thread there. Also, the thread name that gets printed in the getData() function is the main thread. So I see a sequential/blocking behavior rather than sequential/nonblocking(async) behavior. Is it because I had applied a subscribe function on the same thread, it is blocking? Can someone explain this, please?

like image 912
user1189332 Avatar asked Nov 30 '17 10:11

user1189332


People also ask

What is mono reactor?

A Reactive Streams Publisher with basic rx operators that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono). Most Mono implementations are expected to immediately call Subscriber.

Is CompletableFuture reactive programming?

Ans. A CompletableFuture represents one result of an asynchronous call, while Reactive Streams is a pattern for pushing N messages synchronously/asynchronously through a system. CompletableFuture doesn't address the elasticity requirement of the Reactive Manifesto as it does not handle backpressure.

What is the difference between flux and Mono?

A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.

Is Mono subscribe blocking?

As you know, Mono is an asynchronous call that executes in a non-blocking way.


1 Answers

Is it because I had applied a subscribe function on the same thread, it is blocking?

This is exactly what seems to happen.

This specific behavior surprises me a little since it is not the way most pipelines behave. Most pipelines have one way or another some operation in there which make the pipeline async. publishOn, subscribeOn are the obvious examples but also a flatMap might have such an effect and probably many others. In those cases, subscribe will return immediately.

This hints at a very important point about reactive programming though: Pipelines should not contain long blocking calls. A reactive pipeline is intended to be prepared and when subscribed on to process events without blocking. Blocking statements therefore have the very real potential of blocking the whole execution. With the use of a Scheduler you can confine such calls to special ThreadPools and thereby control their effect.

like image 84
Jens Schauder Avatar answered Sep 20 '22 11:09

Jens Schauder