Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RXJS Wait for all observables in an array to complete (or error)

I'm pushing observables into an array like such...

var tasks$ = []; tasks$.push(Observable.timer(1000)); tasks$.push(Observable.timer(3000)); tasks$.push(Observable.timer(10000)); 

I want an Observable that emits when all tasks$ have completed. Keep in mind, in practice, tasks$ doesn't have a known number of Observables.

I've tried Observable.zip(tasks$).subscribe() but this seems to fail in the event that there is only 1 task, and is leading me to believe that ZIP requires an even number of elements in order to work the way I would expect.

I've tried Observable.concat(tasks$).subscribe() but the result of the concat operator just seems to be an array of observables... e.g. basically the same as the input. You can't even call subscribe on it.

In C# this would be akin to Task.WhenAll(). In ES6 promise it would be akin to Promise.all().

I've come across a number of SO questions but they all seem to deal with waiting on a known number of streams (e.g. mapping them together).

like image 255
josh-sachs Avatar asked Jan 19 '17 06:01

josh-sachs


People also ask

What is the difference between combineLatest and forkJoin?

forkJoin - When all observables complete, emit the last emitted value from each. combineLatest - When any observable emits a value, emit the latest value from each.

Do I need to unsubscribe from a completed observable RXJS?

complete() after it has emitted all of it's values. There's no need to unsubscribe. It completes on it's own, which means it unsubscribes all subscribers automatically. This is also the reason why you don't often notice any memory leaks.

What is forkJoin RXJS?

forkJoin is an operator that takes any number of input observables which can be passed either as an array or a dictionary of input observables. If no input observables are provided (e.g. an empty array is passed), then the resulting stream will complete immediately.


1 Answers

If you want to compose an observable that emits when all of the source observables complete, you can use forkJoin:

import { Observable } from 'rxjs/Observable'; import 'rxjs/add/observable/forkJoin'; import 'rxjs/add/operator/first';  var tasks$ = []; tasks$.push(Observable.timer(1000).first()); tasks$.push(Observable.timer(3000).first()); tasks$.push(Observable.timer(10000).first()); Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); }); 
like image 62
cartant Avatar answered Nov 10 '22 03:11

cartant