RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.
RxJava provides a standard workflow that is used to manage all data and events across the application like Create an Observable> Give the Observable some data to emit> Create an Observer> Subscribe the Observer to the Observable. RxJava is becoming more and more popular particularly for Android developers.
It is as described in documentation you have quoted - merge can interleave the outputs, while concat will first wait for earlier streams to finish before processing later streams. In your case, with single-element, static streams, it is not making any real difference (but in theory, merge could output words in random order and still be valid according to spec). If you want to see the difference, try following (you will need to add some sleep afterwards to avoid early exit)
Observable.merge(
Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
.subscribe(System.out::println);
A0 B0 A1 B1 B2 A2 B3 A3 B4 A4
versus
Observable.concat(
Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
.subscribe(System.out::println);
A0 A1 A2 A3 A4 A5 A6 A7 A8
Concat will never start printing B, because stream A never finishes.
s/stream/observable/g ;)
Documentation gives nice graphs to show the difference. You need to remember that merge gives no guarantee of interleaving items one by one, it is just an one of possible examples.
Concat
Merge
Concat
Concat emits the emissions from two or more Observables without interleaving them. It will maintain the order of the observables while emitting the items. It means that it will emit all the items of the first observable and then it will emit all the items of the second observable and so on.
Let's understand it clearly by an example.
final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};
final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);
Observable.concat(observableFirst, observableSecond)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
As we are using Concat Operator, it will maintain the order and emit the values as A1, A2, A3, A4, B1, B2, B3.
Merge
Merge combines multiple Observables into one by merging their emissions. It will not maintain the order while emitting the items.
Let's understand it clearly by an example.
final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};
final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);
Observable.merge(observableFirst, observableSecond)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
As we are using Merge Operator, it will not maintain the order and can emit the values in any order such as A1, B1, A2, A3, B2, B3, A4 or A1, A2, B1, B2, A3, A4, B3 or can be anything.
This is how we should use the Concat and the Merge operators in RxJava depending on our use-case.
If both sources are synchronous as OP stated they return the exact order by both merge and concat.
But when sources are not synchronous, for instance two source like database and a socket for instance push data with random or undetermined delays the outcome or order of data is different for merge and concat.
fun <T> Observable<T>.getAsyncItems(): Observable<T> {
return concatMap {
val delay = Random().nextInt(10)
Observable.just(it)
.delay(delay.toLong(), TimeUnit.MILLISECONDS)
}
}
This extension function above adds delay to each emission randomly, but they are always emitted in same order. If data is A, B, C, D, and E, then output is always A, B, C, D, and E.
Now let's compare how merge
and concat
behave when sources are async.
val source1 =
Observable.just("A", "B", "C", "D", "E").getAsyncItems()
val source2 =
Observable.just(1, 2, 3, 4, 5).getAsyncItems()
Observable.merge(source1, source2).subscribe {print("$it-")}
Prints something like this A-B-1-C-2-D-3-4-E-5-
or A-1-2-3-B-C-D-4-E-5-
it depends on random delay.
Observable.concat(source1, source2).subscribe {print("$it-")}
Prints A-B-C-D-E-1-2-3-4-5-
no matter how many times it runs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With