Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concat VS Merge operator

People also ask

What is RxJava?

RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.

Why use RxJava?

RxJava provides a standard workflow that is used to manage all data and events across the application like Create an Observable> Give the Observable some data to emit> Create an Observer> Subscribe the Observer to the Observable. RxJava is becoming more and more popular particularly for Android developers.


It is as described in documentation you have quoted - merge can interleave the outputs, while concat will first wait for earlier streams to finish before processing later streams. In your case, with single-element, static streams, it is not making any real difference (but in theory, merge could output words in random order and still be valid according to spec). If you want to see the difference, try following (you will need to add some sleep afterwards to avoid early exit)

    Observable.merge(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 B0 A1 B1 B2 A2 B3 A3 B4 A4

versus

    Observable.concat(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 A1 A2 A3 A4 A5 A6 A7 A8

Concat will never start printing B, because stream A never finishes.

s/stream/observable/g ;)

Documentation gives nice graphs to show the difference. You need to remember that merge gives no guarantee of interleaving items one by one, it is just an one of possible examples.

Concat

Concat operator Merge

Merge operator


Concat

Concat emits the emissions from two or more Observables without interleaving them. It will maintain the order of the observables while emitting the items. It means that it will emit all the items of the first observable and then it will emit all the items of the second observable and so on.

Concat Operator

Let's understand it clearly by an example.

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.concat(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

As we are using Concat Operator, it will maintain the order and emit the values as A1, A2, A3, A4, B1, B2, B3.

Merge

Merge combines multiple Observables into one by merging their emissions. It will not maintain the order while emitting the items.

Merge Operator

Let's understand it clearly by an example.

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.merge(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

As we are using Merge Operator, it will not maintain the order and can emit the values in any order such as A1, B1, A2, A3, B2, B3, A4 or A1, A2, B1, B2, A3, A4, B3 or can be anything.

This is how we should use the Concat and the Merge operators in RxJava depending on our use-case.


If both sources are synchronous as OP stated they return the exact order by both merge and concat.

But when sources are not synchronous, for instance two source like database and a socket for instance push data with random or undetermined delays the outcome or order of data is different for merge and concat.

fun <T> Observable<T>.getAsyncItems(): Observable<T> {

    return concatMap {

        val delay = Random().nextInt(10)

        Observable.just(it)
                .delay(delay.toLong(), TimeUnit.MILLISECONDS)
    }
}

This extension function above adds delay to each emission randomly, but they are always emitted in same order. If data is A, B, C, D, and E, then output is always A, B, C, D, and E.

Now let's compare how merge and concat behave when sources are async.

val source1 =
        Observable.just("A", "B", "C", "D", "E").getAsyncItems()
val source2 =
        Observable.just(1, 2, 3, 4, 5).getAsyncItems()

Observable.merge(source1, source2).subscribe {print("$it-")}

Prints something like this A-B-1-C-2-D-3-4-E-5- or A-1-2-3-B-C-D-4-E-5- it depends on random delay.

Observable.concat(source1, source2).subscribe {print("$it-")}

Prints A-B-C-D-E-1-2-3-4-5- no matter how many times it runs.