Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to merge two streams (without nulls) and apply conditions on pairs?

Consider I have two streams of data, is there a way to merge them and apply conditions on data between these two streams? For example

Stream A : A, B, C, D....
Stream B : -, A, -, -....
Composed : (A,-),(B,A),(C,-),(D,-)....

How to get composed stream above using rxjs? I would like to apply conditions on composed streams to raise some notifications. Also would it be possible to use last known non-null data for example see the composed stream below.

Stream A : A, B, C, D....
Stream B : 1, null, 2, null....
Composed : (A,1),(B,1),(C,2),(D,2)....

I've just started playing with reactive streams idea, so please correct me if I've misunderstood the idea of reactive streams.

like image 753
opensourcegeek Avatar asked Jan 31 '15 12:01

opensourcegeek


1 Answers

There are two operators that can serve for your propose.

Zip:
Rx Zip
Reference for RxJs: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/zip.md

CombineLatest:
Rx CombineLatest
Reference for RxJs: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/combinelatest.md

The images explain the differences between both. Now you have merged the observable's you just need to filter, using where, that will filter if one of the values is null.

Unfortunately neither operators can get this behavior that you describe:

Stream A : A, B, C, D, E....
Stream B : 1, null, 2, null, 3....
Composed : (A,1),(B,1),(C,2),(D,2)....

If you use Zip and Where (filtering null values after), the result will be:

Composed: (A,1),(C,2),(E,3)

If you use Where (filtering null values previously) and Zip, the result will be:

Composed: (A,1),(B,2),(C,3)

If you use CombineLatest will depend of the order that the events happens in the Streams, and of course, where you will put the where operator, the result can be different that what you shown, e.g.:

Stream A : A, B, C, D....
Stream B : 1, null, 2, null....
Composed : (A,1),(B,1),(C,1),(C,2),(D,2).... // OR
Composed : (A,1),(B,1),(B,2),(C,2),(D,2).... 

Unless you have more specific requirements, I think one of the options that I mentioned is what you are looking for, feel free to add information.

There are several ways to compose observable's, other operators not mentioned are:

  • distinctUntilChanged, could be added in the final of the composition, using the key selector function to limit for just part of zip or latest value.
  • switch, used to combine one observable inside another.
like image 159
J. Lennon Avatar answered Nov 02 '22 07:11

J. Lennon