Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJs catch error and continue

I have a list of items to parse, but the parsing of one of them can fail.

What is the "Rx-Way" to catch error but continue executing the sequence

Code Sample:

var observable = Rx.Observable.from([0,1,2,3,4,5])  .map(    function(value){        if(value == 3){          throw new Error("Value cannot be 3");        }      return value;    });    observable.subscribe(    function(value){    console.log("onNext " + value);    },    function(error){      console.log("Error: " + error.message);    },    function(){      console.log("Completed!");    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>

What I want to do in a non-Rx-Way:

var items = [0,1,2,3,4,5];    for (var item in items){    try{      if(item == 3){        throw new Error("Value cannot be 3");      }      console.log(item);    }catch(error){       console.log("Error: " + error.message);    }  }

Thanks in advance

like image 565
Cheborra Avatar asked Jul 29 '16 02:07

Cheborra


People also ask

How do I get an observable error message?

Catch errors in the observable stream Another option to catch errors is to use the CatchError Operator. The CatchError Operators catches the error in the observable stream as and when the error happens. This allows us to retry the failed observable or use a replacement observable.

How does a catchError work?

The catchError operator takes as input an Observable that might error out, and starts emitting the values of the input Observable in its output Observable. If no error occurs, the output Observable produced by catchError works exactly the same way as the input Observable.

How do you handle errors in Mergemap?

You'll need to use retry or retryWhen (names are pretty self-explanatory) — these operators will retry a failed subscription (resubscribe to the source observable, once an error is emitted.


2 Answers

I would suggest that you use flatMap (now mergeMap in rxjs version 5) instead, which will let you collapse errors if you don't care about them. Effectively, you will create an inner Observable that can be swallowed if an error occurs. The advantage of this approach is that you can chain together operators and if an error occurs anywhere in the pipeline it will automatically get forwarded to the catch block.

const {from, iif, throwError, of, EMPTY} = rxjs; const {map, flatMap, catchError} = rxjs.operators;  // A helper method to let us create arbitrary operators const {pipe} = rxjs;  // Create an operator that will catch and squash errors // This returns a function of the shape of Observable<T> => Observable<R> const mapAndContinueOnError = pipe(   //This will get skipped if upstream throws an error   map(v => v * 2),   catchError(err => {     console.log("Caught Error, continuing")     //Return an empty Observable which gets collapsed in the output     return EMPTY;   }) )  const observable = from([0, 1, 2, 3, 4, 5]).pipe(   flatMap((value) =>      iif(() => value != 3,        of(value),        throwError(new Error("Value cannot be 3"))     ).pipe(mapAndContinueOnError)   ) );  observable.subscribe(   (value) => console.log("onNext " + value), (error) => console.log("Error: " + error.message), () => console.log("Completed!") );
<script src="https://unpkg.com/[email protected]/dist/bundles/rxjs.umd.min.js"></script>
like image 131
paulpdaniels Avatar answered Sep 24 '22 13:09

paulpdaniels


You need to switch to a new disposable stream, and if an error occurs within it will be disposed safely, and keep the original stream alive:

Rx.Observable.from([0,1,2,3,4,5])      .switchMap(value => {            // This is the disposable stream!          // Errors can safely occur in here without killing the original stream            return Rx.Observable.of(value)              .map(value => {                  if (value === 3) {                      throw new Error('Value cannot be 3');                  }                  return value;              })              .catch(error => {                  // You can do some fancy stuff here with errors if you like                  // Below we are just returning the error object to the outer stream                  return Rx.Observable.of(error);              });        })      .map(value => {          if (value instanceof Error) {              // Maybe do some error handling here              return `Error: ${value.message}`;          }          return value;      })      .subscribe(        (x => console.log('Success', x)),        (x => console.log('Error', x)),        (() => console.log('Complete'))      );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script>

More info on this technique in this blog post: The Quest for Meatballs: Continue RxJS Streams When Errors Occur

like image 37
iamturns Avatar answered Sep 26 '22 13:09

iamturns