Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RXJS Continue with concat subscribe after error

I have an array of observables that need to fire off sequentially. Once an error happens, I need the to catch the error, log it, and continue observing.

At the moment, once an error occurs, the observer stops. It's essential that the observer continues and does not restart or complete on error.

import * as Rx from "rxjs";
const source = [
  Rx.Observable.from("1").delay(200),
  Rx.Observable.from("2").delay(150),
  Rx.Observable.throw("error"),
  Rx.Observable.from("3").delay(124),
  Rx.Observable.from("4").delay(201),
];
let sSource = Rx.Observable.concat(...source);
sSource.subscribe((v) => {console.log(v)}, (e) => {console.log(e)});

Current output:

1
2
error

Expected output:

1
2
error
3
4

The only solution that we could come up with was to pre-loop through the source observables and add catch handlers to them individually, then once an error occurs, it's handled correctly and the observer can continue without completing the entire concatenated observable.

We feel like there should be a more elegant solution to this. I'll post the solution we have at the moment if need be.

like image 417
Zander Rootman Avatar asked Jun 20 '18 22:06

Zander Rootman


2 Answers

You can apply the catch operator to each of the source observables and can perform the error logging within it. Like this:

const sources = [
  Rx.Observable.from("1").delay(200),
  Rx.Observable.from("2").delay(150),
  Rx.Observable.throw("error"),
  Rx.Observable.from("3").delay(124),
  Rx.Observable.from("4").delay(201),
];
const sourcesWithCatch = sources.map(s => s.catch(e => {
  console.log(e);
  return Rx.Observable.empty();
}));
const concatted = Rx.Observable.concat(...sourcesWithCatch);
concatted.subscribe(v => console.log(v));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
like image 117
cartant Avatar answered Sep 20 '22 04:09

cartant


For future reference, Rxjs 5 has the onErrorResumeNext function that behaves like the Visual Basic On Error Resume Next statement.

This is the sample from the documentation

var source = Rx.Observable.onErrorResumeNext(
  Rx.Observable.just(42),
  Rx.Observable.throw(new Error()),
  Rx.Observable.just(56),
  Rx.Observable.throw(new Error()),
  Rx.Observable.just(78)
);

var subscription = source.subscribe(
  data => console.log(data)
);
// => 42
// => 56
// => 78
like image 21
Yennefer Avatar answered Sep 21 '22 04:09

Yennefer