Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: How to merge observables dynamically

Tags:

rxjs

I want to combine 2 or more observables dynamically into one combined observable.

I understood how to combine two observables that already exist with merge, but how can I solve "merging" when the additional observable needs to be added dynamically (e.g. after a timeout)?

Also the existing subscriptions on combinedStream$ should not be lost when merging another observable "on the fly".

Here is what I have so far:

const action1$ = interval(1000).pipe(map(data => 'Action1 value:' + data));
const action2$ = interval(1000).pipe(map(data => 'Action2 value:' + data));

const combinedStream$ = merge(action1$, action2$);
combinedStream$.subscribe(data => console.log('Combined Stream Output:', data));

// Add another observable after some time...
setTimeout(() => {
  const action3$ = interval(1000).pipe(map(data => 'Action3 value:' + data));
  // How add this action3$ to the combined stream ?
}, 1000);

Here is my stackblitz: https://stackblitz.com/edit/rxjs-s2cyzj

like image 565
spierala Avatar asked Jan 26 '20 20:01

spierala


1 Answers

When dealing with that use case, the easiest thing to do is to have an observable... of observable and then use a higher order function like concatAll, switch, mergeAll...

const action1$: Observable<string> = interval(2000).pipe(
  map(data => "Action1 value:" + data)
);
const action2$: Observable<string> = interval(2000).pipe(
  map(data => "Action2 value:" + data)
);
const action3$: Observable<string> = interval(2000).pipe(
  map(data => "Action3 value:" + data)
);

const mainStream$$: Subject<Observable<string>> = new Subject();

const combinedStream$ = mainStream$$.pipe(mergeAll());

combinedStream$.subscribe(data => console.log("Combined Stream Output:", data));

mainStream$$.next(action1$);
mainStream$$.next(action2$);
// Add another stream after some time...
setTimeout(() => {
  mainStream$$.next(action3$);
}, 1000);

Demo: https://stackblitz.com/edit/rxjs-stwvtc?file=index.ts

like image 131
maxime1992 Avatar answered Nov 08 '22 19:11

maxime1992