Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable.forkJoin and array argument

In the Observables forkJoin documentation, it says that args can be an array but it doesn't list an example doing so:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

I have tried a function similar to what I listed (below) but came up with an error:

:3000/angular2/src/platform/browser/browser_adapter.js:76 

EXCEPTION: TypeError: Observable_1.Observable.forkJoin is not a function

A sheared version of my function below:

processStuff( inputObject ) {
  let _self = this;

  return new Observable(function(observer) {
    let observableBatch = [];

    inputObject.forEach(function(componentarray, key) {
      observableBatch.push(_self.http.get(key + '.json').map((res: Response) => res.json()));
    });

    Observable.forkJoin(
      observableBatch
    // );
    ).subscribe(() => {
      observer.next();
      observer.complete();
    });

  });
}

The root of my question is related to a loop to end before proceeding as asked here: Angular2 Observable - how to wait for all function calls in a loop to end before proceeding?

But I haven't fully mastered the correct use of forkJoin with an array and the right syntax to do so.

I am very grateful for help you could offer.

NOTE: EXAMPLE OF THIRD FUNCTION THAT RETURNS AN OBSERVABLE

thirdFunction() {
  let _self = this;

  return Observable.create((observer) => {
  // return new Observable(function(observer) {
    ...

    observer.next(responseargs);
    observer.complete();
  });
}

processStuff(inputObject) {
  let _self = this;
  let observableBatch = [];

  inputObject.forEach((componentarray, key) => {
    observableBatch.push(_self.thirdFunction().map((res: Response) => res.json()));
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff(inputObject)
    .subscribe()
}
like image 797
Benjamin McFerren Avatar asked Feb 27 '16 22:02

Benjamin McFerren


People also ask

What is observable forkJoin?

forkJoin is an operator that takes any number of input observables which can be passed either as an array or a dictionary of input observables. If no input observables are provided (e.g. an empty array is passed), then the resulting stream will complete immediately.

What happens if observable fails in forkJoin?

“If any input observable errors at some point, forkJoin will error as well and all other observables will be immediately unsubscribed.” So, you just lost the retrieved information from todo1$ and todo2$ .

What is the difference between combineLatest and forkJoin?

forkJoin - When all observables complete, emit the last emitted value from each. combineLatest - When any observable emits a value, emit the latest value from each.

Is forkJoin deprecated?

ForkJoin method signature has changedThis is now deprecated and you must pass an array of observables to forkJoin operator.


2 Answers

You need to import operators that are not loaded by default. That's what EXCEPTION Observable.xxxx is not a function usually means. You can either import all operators by adding complete rxjs to your bootstrap, for example:

import 'rxjs/Rx'

or by importing specific operators, in your case:

import 'rxjs/add/observable/forkJoin'

Another observation/suggestion about your code: try to stick with one syntax. You are mixing es5, es6, typescript... and while it is working it will only confuse you in the long run. Also, if you're just starting with Observables, try to avoid new Observable() and use creation operators instead;

processStuff( inputObject ) {
  let observableBatch = [];

  inputObject.forEach(( componentarray, key ) => {
    observableBatch.push( this.http.get( key + '.json').map((res: Response) => res.json()) );
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff( inputObject )
    .subscribe()
}

Finally, refer to the correct documentation - Angular2 uses RxJS v5 and link you provided is for RxJS v4. Docs are still incomplete for v5, but you can find descriptions in many of the source files.

like image 175
Sasxa Avatar answered Oct 23 '22 22:10

Sasxa


Here is a working demo using Angular 12. The syntax is quite a bit different than the accepted answer (No more Observable.):

https://stackblitz.com/edit/angular-map-to-forkjoin?file=src/app/app.component.ts

NOTE: Open the Developer Tools console to see the output of what I'm doing

First, this takes an array of JSON items (In this case, an array of Users) and converts them to individual HTTP Patch calls using the map method.

  private mapCalls(users: any[]): Observable<any>[] {
    return users.map(user => {
      // Each User will be patched to the endpoint
      // None of these calls will be made until forkJoin is used
      return this.http.patch(
        `https://jsonplaceholder.typicode.com/users/${user.id}`,
        user
      );
    });
  }

That is now an array of Observables. forkJoin is added as a wrapper in order for those calls to be made

return forkJoin(mappedCalls);

The end result can then be subscribed to which will output as a single array with the same count of items.

NOTE: If one call fails in a forkJoin, THEY WILL ALL FAIL.

like image 32
austinthedeveloper Avatar answered Oct 23 '22 22:10

austinthedeveloper