Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactor compose vs flatMap

I continue playing with Reactor, and now I see compose operator that behave exactly like flatMap and I´m wondering if there´s any difference that I don't understand.

    @Test
public void compose() throws InterruptedException {
    Scheduler mainThread = Schedulers.single();
    Flux.just(("old element"))
            .compose(element ->
                    Flux.just("new element in new thread")
                            .subscribeOn(mainThread)
                            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
            .subscribe(System.out::println);
    Thread.sleep(1000);
}

@Test
public void flatMapVsCompose() throws InterruptedException {
    Scheduler mainThread = Schedulers.single();
    Flux.just(("old element"))
            .flatMap(element ->
                    Flux.just("new element in new thread")
                            .subscribeOn(mainThread)
                            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
            .subscribe(System.out::println);
    Thread.sleep(1000);
}

This two examples behave and return the same result.

Regards.

like image 639
paul Avatar asked Jan 28 '18 10:01

paul


People also ask

What is flatMap in reactive programming?

The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.

Is flatMap sequential?

No, a flatMap is not sequential. A flatMap subscribes eagerly to the inner streams. Here is a summary of how it works: Receive an event from upstream.

What is compose in RxJava?

RxJava compose is use to transform a data through a series of operator and we should not break a chain for it.

How do you convert flux to Mono?

flatMap() operator We use flatMap() when the transformation returns a Flux or Mono. The flatMap() flattens the result and extracts the data from the Mono. So we should use it when we know that we will have one of the Reactive Types as the data source. That would be all regarding how to transform Flux and Mono in Java.


2 Answers

Explanation from @Andrew is pretty good. Just wanted to add an example for better understanding.

Flux.just("1", "2")
        .compose( stringFlux -> {
            System.out.println("In compose"); // It takes whe whole Flux as input
           return stringFlux.collectList();
        }).subscribe(System.out::println);


Flux.just("1", "2").flatMap(s -> { //Input to the anonymous function is individual items in stream
            System.out.println("In flatMap");
            return Flux.just(Integer.parseInt(s));
        }).subscribe(System.out::println);

This produces the output

In compose
[1, 2]
In flatMap
1
In flatMap
2

Which indicates compose works on entire stream but flatMap works on individual items in the stream

like image 110
pvpkiran Avatar answered Oct 17 '22 21:10

pvpkiran


An excellent explanation by Dan Lew:

The difference is that compose() is a higher level abstraction: it operates on the entire stream, not individually emitted items. In more specific terms:

  • compose() is the only way to get the original Observable<T> from the stream. Therefore, operators that affect the whole stream (like subscribeOn() and observeOn()) need to use compose().

    In contrast, if you put subscribeOn()/observeOn() in flatMap(), it would only affect the Observable you create in flatMap() but not the rest of the stream.

  • compose() executes immediately when you create the Observable stream, as if you had written the operators inline. flatMap() executes when its onNext() is called, each time it is called. In other words, flatMap() transforms each item, whereas compose() transforms the whole stream.

  • flatMap() is necessarily less efficient because it has to create a new Observable every time onNext() is called. compose() operates on the stream as it is. If you want to replace some operators with reusable code, use compose(). flatMap() has many uses but this is not one of them.

like image 28
Andrew Tobilko Avatar answered Oct 17 '22 19:10

Andrew Tobilko