Working with the new version of RxJS 6 and the pipe operator in particular. Currently using the pipe to take the results of an API call and pass them to a series of additional tasks.
All works great, but can't seem to find a way to cancel or end a pipe should I encounter an issue. For example, I'm using the tap operator to check if the value is null. I then throw an error, but the pipe still appears to move to the next task, in this case concatmap.
Therefore, how do you end or cancel a pipe prematurely? Thanks in advance.
getData(id: String): Observable<any[]> { return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe( tap(evt => { if (evt == null) { return throwError(new Error("No data found...")); } }), concatMap( evt => <Observable<any[]>>( this.http.get<any[]>( `${this.baseUrl}/path/relatedby/${evt.child_id}` ).map(res =>( {"response1":evt, "response2":res}) ) ) ), retry(3), catchError(this.handleError("getData", [])) );}
pipe() can be called on one or more functions, each of which can take one argument ("UnaryFunction") and uses it to return a value. It returns a function that takes one argument, passes it to the first UnaryFunction, and then passes the result to the next one, passes that result to the next one, and so on.
RxJS tap() operator is a utility operator that returns an observable output that is identical to the source observable but performs a side effect for every emission on the source observable.
A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified. A Pipeable Operator is essentially a pure function which takes one Observable as input and generates another Observable as output.
RxJS' of() is a creational operator that allows you to create an RxJS Observable from a sequence of values. According to the official docs: of() converts the arguments to an observable sequence. In Angular, you can use the of() operator to implement many use cases.
I tried the basic concept from what you have with this stackblitz and it worked. It cancelled the remaining operations. See the link below.
https://stackblitz.com/edit/angular-4ctwsd?file=src%2Fapp%2Fapp.component.ts
Differences I see between your code and mine is that I used throw
and not throwError
(is that something you wrote?) and I'm just throwing the error ... not returning a thrown error.
Here is the code for reference:
import { Component } from '@angular/core'; import { of, from } from 'rxjs'; import { map, catchError, tap, retry} from 'rxjs/operators'; @Component({ selector: 'my-app', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent { name = 'Angular 6'; constructor() { of('a', 'b', 'c', 'd') .pipe( map(x => { if (x === 'c') { throw 'An error has occurred'; } return x; }), tap(x => console.log('In tap: ', x)), retry(3), catchError(() => of('caught error!')) ) .subscribe(x => console.log(x)); } }
You can also cancel/end a pipe by using a signal Subject
and the rxjs operator: takeUntil
Example
httpGetSafe(path: string): Observable<Data> { const stopSignal$ = new Subject(); return this.http.get<Data>(path).pipe( map(data => { const isBad = data === null; if (isBad) { stopSignal$.next(); } return data; }), takeUntil(stopSignal$) ); }
Sometimes it's a bit simpler and more flexible than throwing errors...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With