Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling Exceptions in Reactive Extensions without stopping sequence

Why RX has the following grammar OnNext* (OnError|OnCompleted)? instead of (OnNext|OnError)* OnCompleted? This is quite clear from implementation perspective(also this has common semantics with IEnumerable and yield) but I guess differs from real life situation. In real life - producers generate mixed stream of data and exceptions (and exceptions doesn't break producer).

The question: If I understood correctly the only possible solution is to make observable return complex data structure combined from initial data and produced exceptions(Observable.Timestamp() and .TimeInterval() has similar concept) or are there any other options?


At the moment I came to the following solution: Inside observable producer I manually handle exeptions and just pass them to the following data structure which is the result of my observable

public class ValueOrException<T>
{
    private readonly Exception ex;
    private readonly T value;

    public ValueOrException(T value, Exception ex)
    {
        this.value = value;
        this.ex = ex;
    }

    public ValueOrException(T value)
    {
        this.value = value;
    }

    public T Value
    {
        get { return this.value; }
    }

    public Exception Ex
    {
        get { return this.ex; }
    }
}
like image 459
arena-ru Avatar asked Nov 09 '12 15:11

arena-ru


2 Answers

If the consumer is the one that knows if the Exception is expected or not, then I say that throwing exceptions is incorrect here.

You're essentially trying to convey state or logic, which exceptions are not meant to do. Exceptions are meant to bring the system (or at the very least, the current operation) to a grinding halt because something has happened which the code doesn't inherently know how to recover from.

In this case, it just so happens that an Exception is part of your business logic/state, but that doesn't mean you can throw them. You need to pass them downstream to your consumer and then have your consumer process them.

A Tuple<T, Exception> would work in a pinch here, but the lack of specificity around the type (as well as the properties) is usually messy, so I'd recommend creating a dedicated type to convey the results of your operation and is exposed through the IObservable<T>.

like image 138
casperOne Avatar answered Oct 02 '22 15:10

casperOne


OnError is meant to be a message that an unrecoverable error occurred so that the stream should be reset. After the recovery, next events don't make sense in the context of events that happened before the exception.

For example, consider the stream of events:

  • User joined the chat
  • User joined the chat
  • !! Chat crashed so all users are kicked
  • User joined the chat

Now the stream consumer use the Aggregate function to display the number of chatters. What should be displayed at the end? Of course 1, not 3. We should start counting from the scratch after the exception.

like image 36
Bartłomiej Szypelow Avatar answered Oct 02 '22 16:10

Bartłomiej Szypelow