Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Do not close Observable after error

I know that this question can appear pretty simple, but the resources on the web about Observables and error handling aren't so great (or maybe I'm just bad at searching).

I have a http request that returns an Observable of Response, these Responses can contain either data or an error message. If it contains data I want to extract it and parse it, if it contains the error message I want to skip all the other operators (about the parsing) and execute the error function in the subscriber.

I can do all this things throwing an Error:

http.get(...).
...
.do(res=>{
  if(res.error) throw new Error(res.error.message);
  return res;
})

and it works, it skips all the operators and it execute the error function. The problem is that, after the error, the Subscriber stops and doesn't accept data anymore.

If I analyze the Subscriber after the error, I notice that the properties closed and isStopped are set to true both. I want to prevent this, I want to keep the Observable active also after the errors. How can I do it?

Thank you

like image 997
Cristian Traìna Avatar asked Mar 15 '17 19:03

Cristian Traìna


People also ask

What happens if you don't subscribe to an observable?

Remember, observables are lazy. If you don't subscribe nothing is going to happen. It's good to know that when you subscribe to an observer, each call of subscribe() will trigger it's own independent execution for that given observer. Subscribe calls are not shared among multiple subscribers to the same observable.

Which RxJS operator can be chained to an observable to handle errors?

The catchError operator takes as input an Observable that might error out, and starts emitting the values of the input Observable in its output Observable.

What is catchError?

RxJS catchError() operator is an error-handling operator used to handle and take care of catching errors on the source observable by returning a new observable or an error.


2 Answers

As mentioned in previous answers, this is the standard behaviour, guaranteed by contract of Rxjs observables. If you want to escape the contracts, you can materialize your source, in the case of which, instead of dealing with messages streamed through your observable, you will deal with meta-messages (called notifications, of the three types you expect, next, error, complete). There is no contract on meta-messages, which put the burden on you to create your error, and complete messages by hand, or use dematerialize to return back to the normal behaviour.

Generally speaking, observables are great for dataflow, while control flow is generally arduous (jumping, looping, conditional branching, etc.).

See the Notification documentation here: http://reactivex.io/rxjs/class/es6/Notification.js~Notification.html.

See the materialize documentation here: http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-materialize

So if you want to deal with errors without stopping your source observable, you can do something like that:

function process(source, success, error) {
  return source.materialize()
    .map(function(notification){
      const value = notification.value; // that is the actual message data
      if (notification.kind === 'N') {
        // that is a next message, we keep it a next message
        return Notification.createNext(success(value));
      }
      if (notification.kind === 'E') {
        // that is a error message, we turned into a next message
        return Notification.createNext(error(value));
      }
      if (notification.kind === 'C') {
        // that is a completed message, we keep it a completed message
        return Notification.createComplete();
      }
    })
   .dematerialize()
}

You can then use this function with your source observable and passing it the success and error function which process the streamed value. As you can see, the technique here is to turn an error message into a normal message, and then revert back to normal.

This is done from the back of my head so untested, so keep me updated if that was helpful to you. I used a similar technique many times with Rxjs v4, I believe it should translate to Rxjs v5 in a straight forward manner.

like image 112
user3743222 Avatar answered Oct 28 '22 22:10

user3743222


What you're asking is literally breaking the contract of the Observable. An Observable is defined as a producer of 0 or more values, and if the producer is finished producing values, it will call the complete handler on its observer, and if there's an error, it will call error.

But once it calls complete OR error, its contract is that it will never call next on its observer again.

There are ways around this - by composing your observables correctly, you can resume your stream with a new observable. But without additional context, it's hard to guide you further. The basic idea is around using the .catch operator in your observable chain to create a new observable on error, but again, additional context would be needed to provide an example.

Edit:

As @cartant helpfully added, see the observable contract

like image 23
snorkpete Avatar answered Oct 28 '22 21:10

snorkpete