Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS 6 - Cancel / End a Pipe

Working with the new version of RxJS 6 and the pipe operator in particular. Currently using the pipe to take the results of an API call and pass them to a series of additional tasks.

All works great, but can't seem to find a way to cancel or end a pipe should I encounter an issue. For example, I'm using the tap operator to check if the value is null. I then throw an error, but the pipe still appears to move to the next task, in this case concatmap.

Therefore, how do you end or cancel a pipe prematurely? Thanks in advance.

getData(id: String): Observable<any[]> { return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(    tap(evt => {     if (evt == null) {       return throwError(new Error("No data found..."));     }   }), concatMap(   evt =>      <Observable<any[]>>(         this.http.get<any[]>(     `${this.baseUrl}/path/relatedby/${evt.child_id}`       ).map(res =>( {"response1":evt, "response2":res}) )  ) ), retry(3), catchError(this.handleError("getData", [])) );} 
like image 287
RookieMcRookie Avatar asked Jul 23 '18 21:07

RookieMcRookie


People also ask

What does pipe () do RxJS?

pipe() can be called on one or more functions, each of which can take one argument ("UnaryFunction") and uses it to return a value. It returns a function that takes one argument, passes it to the first UnaryFunction, and then passes the result to the next one, passes that result to the next one, and so on.

What is TAP () in RxJS?

RxJS tap() operator is a utility operator that returns an observable output that is identical to the source observable but performs a side effect for every emission on the source observable.

Does pipe return a new Observable?

A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified. A Pipeable Operator is essentially a pure function which takes one Observable as input and generates another Observable as output.

What is of () RxJS?

RxJS' of() is a creational operator that allows you to create an RxJS Observable from a sequence of values. According to the official docs: of() converts the arguments to an observable sequence. In Angular, you can use the of() operator to implement many use cases.


2 Answers

I tried the basic concept from what you have with this stackblitz and it worked. It cancelled the remaining operations. See the link below.

https://stackblitz.com/edit/angular-4ctwsd?file=src%2Fapp%2Fapp.component.ts

Differences I see between your code and mine is that I used throw and not throwError (is that something you wrote?) and I'm just throwing the error ... not returning a thrown error.

Here is the code for reference:

import { Component } from '@angular/core'; import { of, from } from 'rxjs'; import { map, catchError, tap, retry} from 'rxjs/operators';  @Component({   selector: 'my-app',   templateUrl: './app.component.html',   styleUrls: ['./app.component.css'] }) export class AppComponent {   name = 'Angular 6';    constructor() {     of('a', 'b', 'c', 'd')       .pipe(        map(x => {         if (x === 'c') {           throw 'An error has occurred';         }         return x;        }),        tap(x => console.log('In tap: ', x)),        retry(3),        catchError(() => of('caught error!'))       )       .subscribe(x => console.log(x));   } } 
like image 160
DeborahK Avatar answered Sep 17 '22 17:09

DeborahK


You can also cancel/end a pipe by using a signal Subject and the rxjs operator: takeUntil

Example

httpGetSafe(path: string): Observable<Data> {   const stopSignal$ = new Subject();    return this.http.get<Data>(path).pipe(     map(data => {       const isBad = data === null;       if (isBad) {         stopSignal$.next();       }       return data;     }),     takeUntil(stopSignal$)   ); } 

Sometimes it's a bit simpler and more flexible than throwing errors...

like image 44
Ben Winding Avatar answered Sep 17 '22 17:09

Ben Winding