I have created an observable that consists of an item being transformed to another by running an async method.
IObservable<Summary> obs = scanner.Scans
.SelectMany(b => GetAssignment(b))
.SelectMany(b => VerifyAssignment(b))
.SelectMany(b => ConfirmAssignmentData(b))
.SelectMany(b => UploadAsset(b))
.Select(assignment => new Summary())
.Catch(LogException());
I would like to make this fail-proof, so in case an exception is thrown during the processing, I should log the exception, but ignore the exception and resume with the next scan (the next item pushed by scanner.Scans
)
The current code catches any exception, but the sequence finished as soon as an exception is thrown.
How can I make it "swallow" the exception (logging it), but to resume with the next item?
Catch errors in the observable stream Another option to catch errors is to use the CatchError Operator. The CatchError Operators catches the error in the observable stream as and when the error happens. This allows us to retry the failed observable or use a replacement observable.
The catchError operator is going to take the error and pass it to the error handling function. That function is expected to return an Observable which is going to be a replacement Observable for the stream that just errored out.
The exception handler function, producing another observable sequence. An observable sequence containing the source sequence's elements, followed by the elements produced by the handler's resulting observable sequence in case an exception occurred.
The exceptions that can occur in a MIPS pipeline are: • IF – Page fault, misaligned memory access, memory protection violation • MEM – Page fault on data, misaligned memory access, memory protection violation In MIPS, exceptions are managed by a System Control Coprocessor (CP0). It saves the PC of the offending or interrupted instruction.
Handle Exception Only RxJava also provides a fallback method that allows continuing the sequence with a provided Observable when an exception (but no error) is raised: As the code above shows, when an error does occur, the onExceptionResumeNext won't kick in to resume the sequence.
RxJava also provides a fallback method that allows continuing the sequence with a provided Observable when an exception (but no error) is raised: As the code above shows, when an error does occur, the onExceptionResumeNext won't kick in to resume the sequence.
Rx is a functional paradigm so it's very useful to use a functional approach to solving this problem.
The answer is to introduce another monad that can cope with errors, like Nullable<T>
can cope with integers having a null value, but in this case a class that can either represent a value or an exception.
public class Exceptional
{
public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
}
public class Exceptional<T>
{
public bool HasException { get; private set; }
public Exception Exception { get; private set; }
public T Value { get; private set; }
public Exceptional(T value)
{
this.HasException = false;
this.Value = value;
}
public Exceptional(Exception exception)
{
this.HasException = true;
this.Exception = exception;
}
public Exceptional(Func<T> factory)
{
try
{
this.Value = factory();
this.HasException = false;
}
catch (Exception ex)
{
this.Exception = ex;
this.HasException = true;
}
}
public override string ToString() =>
this.HasException
? this.Exception.GetType().Name
: (this.Value != null ? this.Value.ToString() : "null");
}
public static class ExceptionalExtensions
{
public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);
public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);
public static Exceptional<U> Select<T, U>(this Exceptional<T> value, Func<T, U> m) =>
value.SelectMany(t => Exceptional.From(() => m(t)));
public static Exceptional<U> SelectMany<T, U>(this Exceptional<T> value, Func<T, Exceptional<U>> k) =>
value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);
public static Exceptional<V> SelectMany<T, U, V>(this Exceptional<T> value, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
}
So, let's start by creating an Rx query that throws an exception.
IObservable<int> query =
Observable
.Range(0, 10)
.Select(x => 5 - x)
.Select(x => 100 / x)
.Select(x => x + 5);
If I run the observable I get this:
Let's transform this with with Exceptional
and see how it allows us to continue processing when an error occurs.
IObservable<Exceptional<int>> query =
Observable
.Range(0, 10)
.Select(x => x.ToExceptional())
.Select(x => x.Select(y => 5 - y))
.Select(x => x.Select(y => 100 / y))
.Select(x => x.Select(y => y + 5));
Now when I run it I get this:
Now I could test each result, see if HasException
is true
and log each exception, meanwhile the observable continues.
Finally, it's easy to clean up the query to look almost the same as the original by introducing one further extension method.
public static IObservable<Exceptional<U>> Select<T, U>(this IObservable<Exceptional<T>> source, Func<T, U> m) =>
source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));
This combines observables and exceptionals into a single Select
operator.
Now the query can look like this:
IObservable<Exceptional<int>> query =
Observable
.Range(0, 10)
.Select(x => x.ToExceptional())
.Select(x => 5 - x)
.Select(x => 100 / x)
.Select(x => x + 5);
I get the same result at before.
Finally, I could get this all working with query syntax by adding two more extension methods:
public static IObservable<Exceptional<U>> SelectMany<T, U>(this IObservable<T> source, Func<T, Exceptional<U>> k) =>
source.Select(t => k(t));
public static IObservable<Exceptional<V>> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
This allows:
IObservable<Exceptional<int>> query =
from n in Observable.Range(0, 10)
from x in n.ToExceptional()
let a = 5 - x
let b = 100 / a
select b + 5;
Again, I get the same results as before.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With