Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RXJS Combining multiple observables inside a pipe

I have an API call that returns a certain amount of ids. Each of these ids are used to make a new api call. The results of these API calls need to be combined into a single object.

At first I used a loop inside the .pipe(map) operator of the first api call. In this loop I did the second api calls, and in the .pipe(map) operator in each of these calls I would edit a variable in my angular component.

This wasn't very pretty, and I was actually wondering if this is thread safe. I know javascript is single threaded, but it doesn't seem very safe to have multiple asynchronous processes messing with the same global variable.

after that I just stored the observable returned by the second api call in an array by looping over the returned Ids by apiCall1, and used forkJoin to subscribe and handle each result accordingly (see example).

This isn't very pretty however, and I was wondering if there's an operator I can use in my pipe for this?

So instead of (pseudocode):

  .pipe(
      map(ids=> {

        let observables = []
        for (const id of ids) {
         observables.push(this.service.getSomeStuff(id));
        }

        forkJoin(...observables).subscribe(dataArray) => {
          for (data of dataArray) {
            //Do something
          }
        });

      }),
      takeWhile(() => this.componentActive),
      catchError(error => {
        console.log(error);
        return throwError(error);
      })
    )
    .subscribe();

Is there an operator that makes it something like this:

  .pipe(
      map(ids=> {

        let observables = []
        for (const id of ids) {
         observables.push(this.service.getSomeStuff(id));
        }

      return observables
      }),
      forkJoin(dataArray => {
          for (data of dataArray) {
            //Do something
          }
        });
      takeWhile(() => this.componentActive),
      catchError(error => {
        console.log(error);
        return throwError(error);
      })
    )
    .subscribe();
like image 361
BartKrul Avatar asked Jun 17 '19 14:06

BartKrul


People also ask

How do I combine multiple Observables into one?

concat. We can use the concat operator to take multiple Observables and return a new Observable that sequentially emits values from each Observable that were passed in. It works by subscribing to them one at a time and merging the results in the output Observable.

How do I combine two Observables RxJS?

The RxJS merge() operator is a join operator that is used to turn multiple observables into a single observable. It creates an output Observable, which concurrently emits all values from every given input Observables.

Does pipe subscribe to Observable?

A Pipeable Operator is essentially a pure function which takes one Observable as input and generates another Observable as output. Subscribing to the output Observable will also subscribe to the input Observable.

What does pipe () do RxJS?

Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.


3 Answers

Instead of modifying your data at once using for, why don't you do it as the data is received? Something like this.

source$.pipe(
  mergeMap(ids => from(ids)),
  mergeMap(id => this.service.getSomeStuff(id)),
  tap(data => //do someting with the data);
  takeWhile(() => this.componentActive),
  catchError(error => {
    console.log(error);
    return throwError(error);
  }),
  toArray(), --> this create the array of all the values received.
)
.subscribe(data => //returns array of modified data);
like image 67
emkay Avatar answered Oct 12 '22 02:10

emkay


This is what you can do:

sourceObservable$.pipe(
  // depends on your need here you can use mergeMap as well
  switchMap(ids => {
    const observables = ids.map(i => this.service.getSomeStuff(id));
    return forkJoin(observables);
  }),
  tap(joined => {
    // joined will be an array of values of the observables in the same
    // order as you pushed in forkJoin
    for (data of joined) {
      // do something
    }
  }),
  takeWhile(() => this.componentActive),
  catchError(error => {
    console.log(error);
    return throwError(error);
  })
)
.subscribe();
like image 9
user2216584 Avatar answered Oct 12 '22 02:10

user2216584


combineLatest(...observables)

will only emit after all observables have emitted and you will have an array of the results.

like image 3
Adrian Brand Avatar answered Oct 12 '22 03:10

Adrian Brand