Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava / RxJs: How to merge two source observables but complete as soon as one of them completes

I have two source observables. I would like to merge the two source observables, but the merged observable sould complete as soon as one of the source observables completes.

Desired behavior:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4--x

In case of an error on one of the sources, the error should propagate to the merged observable:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------e
"merged"  ---1---2----3--4--ex

The "merge" operator only completes the merged stream when both sources have completed:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4-----------------------------x

How can I achieve my desired behavior?

like image 719
jbandi Avatar asked Dec 19 '17 14:12

jbandi


1 Answers

You need to work with the metadata, information about each observable. To do this, use the materialize() operator on each stream and the use dematerialize() on the merged stream to actually emit the data.

Observable.merge( observableA.materialize(),
                  observableB.materialize() )
  .takeWhile( notification -> notification.hasValue() )
  .dematerialize()
  .subscribe( ... );

This will merge the two observables until either one of them completes or emits an error.

like image 145
Bob Dalgleish Avatar answered Sep 20 '22 17:09

Bob Dalgleish