Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

A forkJoin alternative for uncompleted observables?

Tags:

constructor(     private route: ActivatedRoute,     private http: Http ){     // Observe parameter changes     let paramObs = route.paramMap;      // Fetch data once     let dataObs = http.get('...');      // Subscribe to both observables,     // use both resolved values at the same level } 

Is there something similar to forkJoin that triggers whenever a parameter change is emitted? forkJoin only works when all observables have completed.

I just need to avoid callback hell, any alternative that complies is welcome.

like image 251
Marek Toman Avatar asked Jul 18 '17 16:07

Marek Toman


People also ask

What can I use instead of forkJoin?

concat() which will handle each observable in sequence.

Is forkJoin deprecated?

forkJoin Improvements Moreover, there is one deprecation — forkJoin(a, b, c, d) should no longer be used; Instead, pass an array such as forkJoin([a, b, c, d]) .

What is the difference between combineLatest and forkJoin?

combineLatest is similar to forkJoin, except that it combines the latest results of all the observables and emits the combined final value. So until each observable is completed, the subscription block emits the result.

Does forkJoin complete?

As forkJoin only completes when all inner observables complete, we must be mindful if an observable never completes.


2 Answers

There are several options:

  1. Use take(1) with forkJoin() to complete each source Observable:

    forkJoin(o1$.take(1), o2$.take(1)) 
  2. Use zip() and take(1) to emit only when all source Observables have emitted the same number of items:

    zip(o1$, o2$).take(1) 
  3. Use combineLatest() to emit when any of the source Observables emit:

    combineLatest(o1$, o2$) 

Jan 2019: Updated for RxJS 6

like image 92
martin Avatar answered Oct 29 '22 17:10

martin


A small trick to avoid breaking of observable subscriptions if any one of the observable fails.

 import { throwError, of, forkJoin } from "rxjs"; import { catchError, take } from "rxjs/operators";  //emits an error with specified value on subscription const observables$ = []; const observableThatWillComplete$ = of(1, 2, 3, 4, 5).pipe(take(1));  const observableThatWillFail$ = throwError(   "This is an error hence breaking the stream" ).pipe(catchError((error) => of(`Error Catched: ${error}`)));  observables$.push(observableThatWillComplete$, observableThatWillFail$);  forkJoin(observables$).subscribe(responses => {   console.log("Subscribed");   console.log(responses); });  
like image 45
khizer Avatar answered Oct 29 '22 16:10

khizer