Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RXJS: alternately combine elements of streams

Tags:

rxjs

I'd like to alternately combine elements of multiple streams:

var print = console.log.bind(console);

var s1 = Rx.Observable.fromArray([1, 1, 5]);
var s2 = Rx.Observable.fromArray([2, 9]);
var s3 = Rx.Observable.fromArray([3, 4, 6, 7, 8]);

alternate(s1, s2, s3).subscribe(print); // 1, 2, 3, 1, 9, 4, 5, 6, 7, 8

How looks the function definition of alternate?

like image 910
maiermic Avatar asked Feb 22 '15 10:02

maiermic


1 Answers

Use zip and concatMap when working on observables that were created from arrays (as in your example), or zip and flatMap when working on observables that are inherently asynchronous.

Rx.Observable
  .zip(s1, s2, s3, function(x,y,z) { return [x,y,z]; })
  .concatMap(function (list) { return Rx.Observable.from(list); })
  .subscribe(print); // 1, 2, 3, 1, 9, 4

Notice that this doesn't proceed anymore once one of the source observables completes. That's because zip is strictly "balanced" and it waits until all the sources have a matching event. What you want is a somewhat loose version of zip when dealing with completed sources.

like image 70
André Staltz Avatar answered Oct 05 '22 12:10

André Staltz