In the Observables forkJoin documentation, it says that args can be an array but it doesn't list an example doing so:
https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md
I have tried a function similar to what I listed (below) but came up with an error:
:3000/angular2/src/platform/browser/browser_adapter.js:76
EXCEPTION: TypeError: Observable_1.Observable.forkJoin is not a function
A sheared version of my function below:
processStuff( inputObject ) {
let _self = this;
return new Observable(function(observer) {
let observableBatch = [];
inputObject.forEach(function(componentarray, key) {
observableBatch.push(_self.http.get(key + '.json').map((res: Response) => res.json()));
});
Observable.forkJoin(
observableBatch
// );
).subscribe(() => {
observer.next();
observer.complete();
});
});
}
The root of my question is related to a loop to end before proceeding as asked here: Angular2 Observable - how to wait for all function calls in a loop to end before proceeding?
But I haven't fully mastered the correct use of forkJoin with an array and the right syntax to do so.
I am very grateful for help you could offer.
thirdFunction() {
let _self = this;
return Observable.create((observer) => {
// return new Observable(function(observer) {
...
observer.next(responseargs);
observer.complete();
});
}
processStuff(inputObject) {
let _self = this;
let observableBatch = [];
inputObject.forEach((componentarray, key) => {
observableBatch.push(_self.thirdFunction().map((res: Response) => res.json()));
});
return Observable.forkJoin(observableBatch);
}
elsewhere() {
this.processStuff(inputObject)
.subscribe()
}
forkJoin is an operator that takes any number of input observables which can be passed either as an array or a dictionary of input observables. If no input observables are provided (e.g. an empty array is passed), then the resulting stream will complete immediately.
“If any input observable errors at some point, forkJoin will error as well and all other observables will be immediately unsubscribed.” So, you just lost the retrieved information from todo1$ and todo2$ .
forkJoin - When all observables complete, emit the last emitted value from each. combineLatest - When any observable emits a value, emit the latest value from each.
ForkJoin method signature has changedThis is now deprecated and you must pass an array of observables to forkJoin operator.
You need to import operators that are not loaded by default. That's what EXCEPTION Observable.xxxx is not a function
usually means. You can either import all operators by adding complete rxjs
to your bootstrap, for example:
import 'rxjs/Rx'
or by importing specific operators, in your case:
import 'rxjs/add/observable/forkJoin'
Another observation/suggestion about your code: try to stick with one syntax. You are mixing es5, es6, typescript... and while it is working it will only confuse you in the long run. Also, if you're just starting with Observables, try to avoid new Observable()
and use creation operators instead;
processStuff( inputObject ) {
let observableBatch = [];
inputObject.forEach(( componentarray, key ) => {
observableBatch.push( this.http.get( key + '.json').map((res: Response) => res.json()) );
});
return Observable.forkJoin(observableBatch);
}
elsewhere() {
this.processStuff( inputObject )
.subscribe()
}
Finally, refer to the correct documentation - Angular2 uses RxJS v5 and link you provided is for RxJS v4. Docs are still incomplete for v5, but you can find descriptions in many of the source files.
Here is a working demo using Angular 12. The syntax is quite a bit different than the accepted answer (No more Observable.):
https://stackblitz.com/edit/angular-map-to-forkjoin?file=src/app/app.component.ts
NOTE: Open the Developer Tools console to see the output of what I'm doing
First, this takes an array of JSON items (In this case, an array of Users) and converts them to individual HTTP Patch calls using the map
method.
private mapCalls(users: any[]): Observable<any>[] {
return users.map(user => {
// Each User will be patched to the endpoint
// None of these calls will be made until forkJoin is used
return this.http.patch(
`https://jsonplaceholder.typicode.com/users/${user.id}`,
user
);
});
}
That is now an array of Observables. forkJoin
is added as a wrapper in order for those calls to be made
return forkJoin(mappedCalls);
The end result can then be subscribed to which will output as a single array with the same count of items.
NOTE: If one call fails in a forkJoin, THEY WILL ALL FAIL.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With