Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling errors in an observable sequence using Rx

Is there a way to have an observable sequence to resume execution with the next element in the sequence if an error occurs? From this post it looks like you need to specify a new observable sequence in Catch() to resume execution, but what if you needed to just continue processing with the next element in the sequence instead? Is there a way to achieve this?

UPDATE: The scenario is as follows: I have a bunch of elements that I need to process. The processing is made up of a bunch of steps. I have decomposed the steps into tasks that I would like to compose. I followed the guidelines for ToObservable() posted here to convert by tasks to an observables for composition. so basically I'm doing somethng like so -

foreach(element in collection)
{
   var result = from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;
   result.subscribe( register on next and error handlers here)
 }

or I could something like this:

var result = 
        from element in collection.ToObservable() 
        from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;

What is the best way here to continue processing other elements even if let's say the processing of one of the elements throws an exception. I would like to be able to log the error and move on ideally.

like image 774
Abhijeet Patel Avatar asked May 19 '11 01:05

Abhijeet Patel


2 Answers

Both James & Richard made some good points, but I don't think they have given you the best method for solving your problem.

James suggested using .Catch(Observable.Never<Unit>()). He was wrong when he said that "will ... allow the stream to continue" because once you hit an exception the stream must end - that is what Richard pointed out when he mentioned the contract between observers and observables.

Also, using Never in this way will cause your observables to never complete.

The short answer is that .Catch(Observable.Empty<Unit>()) is the correct way to change a sequence from one that ends with an error to one that ends with completion.

You've hit on the right idea of using SelectMany to process each value of the source collection so that you can catch each exception, but you're left with a couple of issues.

You're using tasks (TPL) just to turn a function call into an observable. This forces your observable to use task pool threads which means that the SelectMany statement will likely produce values in a non-deterministic order.

Also you hide the actual calls to process your data making refactoring and maintenance harder.

I think you're better off creating an extension method that allows the exceptions to be skipped. Here it is:

public static IObservable<R> SelectAndSkipOnException<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return
        source
            .Select(t =>
                Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
            .Merge();
}

With this method you can now simply do this:

var result =
    collection.ToObservable()
        .SelectAndSkipOnException(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        });

This code is much simpler, but it hides the exception(s). If you want to hang on to the exceptions while letting your sequence continue then you need to do some extra funkiness. Adding a couple of overloads to the Materialize extension method works to keep the errors.

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
}

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<Notification<T>> source, Func<T, R> selector)
{
    Func<Notification<T>, Notification<R>> f = nt =>
    {
        if (nt.Kind == NotificationKind.OnNext)
        {
            try
            {
                return Notification.CreateOnNext<R>(selector(nt.Value));
            }
            catch (Exception ex)
            {
                ex.Data["Value"] = nt.Value;
                ex.Data["Selector"] = selector;
                return Notification.CreateOnError<R>(ex);
            }
        }
        else
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                return Notification.CreateOnError<R>(nt.Exception);
            }
            else
            {
                return Notification.CreateOnCompleted<R>();
            }
        }
    };
    return source.Select(nt => f(nt));
}

These methods allow you to write this:

var result =
    collection
        .ToObservable()
        .Materialize(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        })
        .Do(nt =>
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                /* Process the error in `nt.Exception` */
            }
        })
        .Where(nt => nt.Kind != NotificationKind.OnError)
        .Dematerialize();

You can even chain these Materialize methods and use ex.Data["Value"] & ex.Data["Selector"] to get the value and selector function that threw the error out.

I hope this helps.

like image 53
Enigmativity Avatar answered Sep 20 '22 22:09

Enigmativity


The contract between IObservable and IObserver is OnNext*(OnCompelted|OnError)? which is upheld by all operators, even if not by the source.

Your only choice is to re-subscribe to the source using Retry, but if the source returns the IObservable instance for every description you won't see any new values.

Could you supply more information on your scenario? Maybe there is another way of looking at it.

Edit: Based on your updated feedback, it sounds like you just need Catch:

var result = 
    from element in collection.ToObservable() 
    from aResult in DoAAsync(element).ToObservable().Log().Catch(Observable.Empty<TA>())
    from bResult in DoBAsync(aResult).ToObservable().Log().Catch(Observable.Empty<TB>()) 
    from cResult in DoCAsync(bResult).ToObservable().Log().Catch(Observable.Empty<TC>())
    select cResult;

This replaces an error with an Empty which would not trigger the next sequence (since it uses SelectMany under the hood.

like image 40
Richard Szalay Avatar answered Sep 17 '22 22:09

Richard Szalay