I'm pushing observables into an array like such...
var tasks$ = []; tasks$.push(Observable.timer(1000)); tasks$.push(Observable.timer(3000)); tasks$.push(Observable.timer(10000));
I want an Observable that emits when all tasks$ have completed. Keep in mind, in practice, tasks$ doesn't have a known number of Observables.
I've tried Observable.zip(tasks$).subscribe()
but this seems to fail in the event that there is only 1 task, and is leading me to believe that ZIP requires an even number of elements in order to work the way I would expect.
I've tried Observable.concat(tasks$).subscribe()
but the result of the concat operator just seems to be an array of observables... e.g. basically the same as the input. You can't even call subscribe on it.
In C# this would be akin to Task.WhenAll()
. In ES6 promise it would be akin to Promise.all()
.
I've come across a number of SO questions but they all seem to deal with waiting on a known number of streams (e.g. mapping them together).
forkJoin - When all observables complete, emit the last emitted value from each. combineLatest - When any observable emits a value, emit the latest value from each.
complete() after it has emitted all of it's values. There's no need to unsubscribe. It completes on it's own, which means it unsubscribes all subscribers automatically. This is also the reason why you don't often notice any memory leaks.
forkJoin is an operator that takes any number of input observables which can be passed either as an array or a dictionary of input observables. If no input observables are provided (e.g. an empty array is passed), then the resulting stream will complete immediately.
If you want to compose an observable that emits when all of the source observables complete, you can use forkJoin
:
import { Observable } from 'rxjs/Observable'; import 'rxjs/add/observable/forkJoin'; import 'rxjs/add/operator/first'; var tasks$ = []; tasks$.push(Observable.timer(1000).first()); tasks$.push(Observable.timer(3000).first()); tasks$.push(Observable.timer(10000).first()); Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With