Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Exception handling in observable pipeline

I have created an observable that consists of an item being transformed to another by running an async method.

IObservable<Summary> obs = scanner.Scans
                    .SelectMany(b => GetAssignment(b))
                    .SelectMany(b => VerifyAssignment(b))
                    .SelectMany(b => ConfirmAssignmentData(b))
                    .SelectMany(b => UploadAsset(b))
                    .Select(assignment => new Summary())
                    .Catch(LogException());

I would like to make this fail-proof, so in case an exception is thrown during the processing, I should log the exception, but ignore the exception and resume with the next scan (the next item pushed by scanner.Scans)

The current code catches any exception, but the sequence finished as soon as an exception is thrown.

How can I make it "swallow" the exception (logging it), but to resume with the next item?

like image 915
SuperJMN Avatar asked May 13 '21 11:05

SuperJMN


People also ask

How do you perform error handling in observables?

Catch errors in the observable stream Another option to catch errors is to use the CatchError Operator. The CatchError Operators catches the error in the observable stream as and when the error happens. This allows us to retry the failed observable or use a replacement observable.

Which operator is used to forward a failed Observable to the error handler?

The catchError operator is going to take the error and pass it to the error handling function. That function is expected to return an Observable which is going to be a replacement Observable for the stream that just errored out.

How do you handle exceptions in an observable sequence?

The exception handler function, producing another observable sequence. An observable sequence containing the source sequence's elements, followed by the elements produced by the handler's resulting observable sequence in case an exception occurred.

What are the exceptions in a MIPS pipeline?

The exceptions that can occur in a MIPS pipeline are: • IF – Page fault, misaligned memory access, memory protection violation • MEM – Page fault on data, misaligned memory access, memory protection violation In MIPS, exceptions are managed by a System Control Coprocessor (CP0). It saves the PC of the offending or interrupted instruction.

What is handlehandle exception only RxJava?

Handle Exception Only RxJava also provides a fallback method that allows continuing the sequence with a provided Observable when an exception (but no error) is raised: As the code above shows, when an error does occur, the onExceptionResumeNext won't kick in to resume the sequence.

What is the fallback method for exception handling in RxJava?

RxJava also provides a fallback method that allows continuing the sequence with a provided Observable when an exception (but no error) is raised: As the code above shows, when an error does occur, the onExceptionResumeNext won't kick in to resume the sequence.


Video Answer


1 Answers

Rx is a functional paradigm so it's very useful to use a functional approach to solving this problem.

The answer is to introduce another monad that can cope with errors, like Nullable<T> can cope with integers having a null value, but in this case a class that can either represent a value or an exception.

public class Exceptional
{
    public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
    public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
    public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
}

public class Exceptional<T>
{
    public bool HasException { get; private set; }
    public Exception Exception { get; private set; }
    public T Value { get; private set; }

    public Exceptional(T value)
    {
        this.HasException = false;
        this.Value = value;
    }

    public Exceptional(Exception exception)
    {
        this.HasException = true;
        this.Exception = exception;
    }

    public Exceptional(Func<T> factory)
    {
        try
        {
            this.Value = factory();
            this.HasException = false;
        }
        catch (Exception ex)
        {
            this.Exception = ex;
            this.HasException = true;
        }
    }

    public override string ToString() =>
        this.HasException
            ? this.Exception.GetType().Name
            : (this.Value != null ? this.Value.ToString() : "null");
}


public static class ExceptionalExtensions
{
    public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);

    public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);

    public static Exceptional<U> Select<T, U>(this Exceptional<T> value, Func<T, U> m) =>
        value.SelectMany(t => Exceptional.From(() => m(t)));

    public static Exceptional<U> SelectMany<T, U>(this Exceptional<T> value, Func<T, Exceptional<U>> k) =>
        value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);

    public static Exceptional<V> SelectMany<T, U, V>(this Exceptional<T> value, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
        value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
}

So, let's start by creating an Rx query that throws an exception.

IObservable<int> query =
    Observable
        .Range(0, 10)
        .Select(x => 5 - x)
        .Select(x => 100 / x)
        .Select(x => x + 5);

If I run the observable I get this:

Normal Query

Let's transform this with with Exceptional and see how it allows us to continue processing when an error occurs.

IObservable<Exceptional<int>> query =
    Observable
        .Range(0, 10)
        .Select(x => x.ToExceptional())
        .Select(x => x.Select(y => 5 - y))
        .Select(x => x.Select(y => 100 / y))
        .Select(x => x.Select(y => y + 5));

Now when I run it I get this:

Query with Exceptional

Now I could test each result, see if HasException is true and log each exception, meanwhile the observable continues.

Finally, it's easy to clean up the query to look almost the same as the original by introducing one further extension method.

    public static IObservable<Exceptional<U>> Select<T, U>(this IObservable<Exceptional<T>> source, Func<T, U> m) =>
        source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));

This combines observables and exceptionals into a single Select operator.

Now the query can look like this:

IObservable<Exceptional<int>> query =
    Observable
        .Range(0, 10)
        .Select(x => x.ToExceptional())
        .Select(x => 5 - x)
        .Select(x => 100 / x)
        .Select(x => x + 5);

I get the same result at before.


Finally, I could get this all working with query syntax by adding two more extension methods:

public static IObservable<Exceptional<U>> SelectMany<T, U>(this IObservable<T> source, Func<T, Exceptional<U>> k) =>
    source.Select(t => k(t));

public static IObservable<Exceptional<V>> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
    source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));

This allows:

IObservable<Exceptional<int>> query =
    from n in Observable.Range(0, 10)
    from x in n.ToExceptional()
    let a = 5 - x
    let b = 100 / a
    select b + 5;

Again, I get the same results as before.

like image 60
Enigmativity Avatar answered Oct 02 '22 02:10

Enigmativity