Can anyone help with the scenario where this._getReactions$.next()
not working whenever this.http.get(...)
gets an error. I want to keep observable alive to take the next input.
private _getReactions$: Subject<any> = new Subject();
constructor() {
this._getReactions$
.pipe(
switchMap(() => {
return this.http.get(...)
// http request
}),
catchError(error => {
console.log(error);
return empty();
})
)
.subscribe(data => {
console.log(data)
//results handling
});
}
onClick() {
this._getReactions$.next();
}
If observable dies it calls it error handler and they are closed you can't send anything through them that means they are closed everything upstream from that including the interval is dead.
what if we want to live.
sheilding the main observer chain is the solution
put catch inside of switchmap
whenever a request is fired switchmap
creates the ajax observable and this time with the
catch
.switchmap
has a behavior that it says my source
is not completed yet so I don't really care if the child
completes I gonna keep going.
constructor() {
this._getReactions$
.pipe(tap(value => { this.loading = true; return value }),
switchMap(() => {
return this.http.get(...).pipe(
catchError((error) => this.handleError(error)))
// http request
}),
)
.subscribe(data => {
console.log(data)
//results handling
this.error = false;
this.loading = false
});
}
private handleError(error: HttpErrorResponse) {
this.error = true;
console.log(error)
this.loading = false
return empty();
Live Demo
Detailed Info
PS: nesting within any flattening
operator, such as mergeMap
, concatMap
, exhaustMap
and other flattening operators would also work.
I have made a way to fix this for all the request
Create a loader file where all request will be executed
loader.ts
import { Observable, Subject, Subscription, EMPTY } from 'rxjs';
import { catchError, map, switchMap } from 'rxjs/operators';
export class Loader<T1, T> {
private _requestQueue: Subject<T1>;
private _errorQueue: Subject<Error>;
private _resultQueue: Observable<T>;
private _loaded = false;
constructor(loaderFunction: (T1) => Observable<T>) {
this._requestQueue = new Subject<T1>();
this._errorQueue = new Subject<Error>();
this._resultQueue = this._requestQueue.pipe(
switchMap(_ => {
this._loaded = false;
return loaderFunction(_).pipe(
catchError(error => {
this._loaded = true;
this._errorQueue.next(error);
// Returning EMPTY observable won't complete the stream
return EMPTY;
})
);
}),
map(_ => {
this._loaded = true;
return _;
}),
);
}
public load(arg?: T1): void {
this._requestQueue.next(arg);
}
public subscribe(successFn: (T) => any, errorFn?: (error: any) => void,
completeFn?: () => void): Subscription {
this._errorQueue.subscribe(err => {
errorFn(err);
});
return this._resultQueue.subscribe(successFn, null, completeFn);
}
public complete() {
this._requestQueue.complete();
this._errorQueue.complete();
}
get loaded(): boolean {
return this._loaded;
}
}
In other files where you will be executing the requests (Easy)
export class Component {
readonly loader: Loader<ResponseType, RequestParamType>;
constructor() {
this.loader = new Loader(param => this.http.get(param));
this.loader.subscribe(res => {
// Your stuffs
}, (error) => {
// Error Handling stuffs
}, () => {
// on Complete stuffs (Optional)
});
}
ngOnInit() {
this.loadData();
}
loadData() { // Call this function whenever you want to refresh the data
this.loader.load(params); // this param will directly passed to the http request
}
}
I have defined other params in the loader, which can help you like loading status and option to complete the stream (in ngOnDestroy)
Happy Coding!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With