I continue playing with Reactor, and now I see compose
operator that behave exactly like flatMap
and I´m wondering if there´s any difference that I don't understand.
@Test
public void compose() throws InterruptedException {
Scheduler mainThread = Schedulers.single();
Flux.just(("old element"))
.compose(element ->
Flux.just("new element in new thread")
.subscribeOn(mainThread)
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(1000);
}
@Test
public void flatMapVsCompose() throws InterruptedException {
Scheduler mainThread = Schedulers.single();
Flux.just(("old element"))
.flatMap(element ->
Flux.just("new element in new thread")
.subscribeOn(mainThread)
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(1000);
}
This two examples behave and return the same result.
Regards.
The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.
No, a flatMap is not sequential. A flatMap subscribes eagerly to the inner streams. Here is a summary of how it works: Receive an event from upstream.
RxJava compose is use to transform a data through a series of operator and we should not break a chain for it.
flatMap() operator We use flatMap() when the transformation returns a Flux or Mono. The flatMap() flattens the result and extracts the data from the Mono. So we should use it when we know that we will have one of the Reactive Types as the data source. That would be all regarding how to transform Flux and Mono in Java.
Explanation from @Andrew is pretty good. Just wanted to add an example for better understanding.
Flux.just("1", "2")
.compose( stringFlux -> {
System.out.println("In compose"); // It takes whe whole Flux as input
return stringFlux.collectList();
}).subscribe(System.out::println);
Flux.just("1", "2").flatMap(s -> { //Input to the anonymous function is individual items in stream
System.out.println("In flatMap");
return Flux.just(Integer.parseInt(s));
}).subscribe(System.out::println);
This produces the output
In compose
[1, 2]
In flatMap
1
In flatMap
2
Which indicates compose
works on entire stream but flatMap
works on individual items in the stream
An excellent explanation by Dan Lew:
The difference is that compose()
is a higher level abstraction: it operates on the entire stream, not individually emitted items. In more specific terms:
compose()
is the only way to get the original Observable<T>
from the stream. Therefore, operators that affect the whole stream (like subscribeOn()
and observeOn()
) need to use compose()
.
In contrast, if you put subscribeOn()
/observeOn()
in flatMap()
, it would only affect the Observable
you create in flatMap()
but not the rest of the stream.
compose()
executes immediately when you create the Observable
stream, as if you had written the operators inline. flatMap()
executes when its onNext()
is called, each time it is called. In other words, flatMap()
transforms each item, whereas compose()
transforms the whole stream.
flatMap()
is necessarily less efficient because it has to create a new Observable
every time onNext()
is called. compose()
operates on the stream as it is. If you want to replace some operators with reusable code, use compose()
. flatMap()
has many uses but this is not one of them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With