Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write an Rx "RetryAfter" extension method

In the book IntroToRx the author suggest to write a "smart" retry for I/O which retry an I/O request, like a network request, after a period of time.

Here is the exact paragraph:

A useful extension method to add to your own library might be a "Back Off and Retry" method. The teams I have worked with have found such a feature useful when performing I/O, especially network requests. The concept is to try, and on failure wait for a given period of time and then try again. Your version of this method may take into account the type of Exception you want to retry on, as well as the maximum number of times to retry. You may even want to lengthen the to wait period to be less aggressive on each subsequent retry.

Unfortunately, I can't figure out how to write this method. :(

like image 396
nverinaud Avatar asked Sep 24 '13 10:09

nverinaud


4 Answers

The key to this implementation of a back off retry is deferred observables. A deferred observable won't execute its factory until someone subscribes to it. And it will invoke the factory for each subscription, making it ideal for our retry scenario.

Assume we have a method which triggers a network request.

public IObservable<WebResponse> SomeApiMethod() { ... }

For the purposes of this little snippet, let's define the deferred as source

var source = Observable.Defer(() => SomeApiMethod());

Whenever someone subscribes to source it will invoke SomeApiMethod and launch a new web request. The naive way to retry it whenever it fails would be using the built in Retry operator.

source.Retry(4)

That wouldn't be very nice to the API though and it's not what you're asking for. We need to delay the launching of requests in between each attempt. One way of doing that is with a delayed subscription.

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)

That's not ideal since it'll add the delay even on the first request, let's fix that.

int attempt = 0;
Observable.Defer(() => { 
   return ((++attempt == 1)  ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)

Just pausing for a second isn't a very good retry method though so let's change that constant to be a function which receives the retry count and returns an appropriate delay. Exponential back off is easy enough to implement.

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))

We're almost done now, we just need to add a way of specifying for which exceptions we should retry. Let's add a function that given an exception returns whether or not it makes sense to retry, we'll call it retryOnError.

Now we need to write some scary looking code but bear with me.

Observable.Defer(() => {
    return ((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))
        .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
        .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
            ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
            : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
    ? Observable.Return(t.Item2)
    : Observable.Throw<T>(t.Item3))

All of those angle brackets are there to marshal an exception for which we shouldn't retry past the .Retry(). We've made the inner observable be an IObservable<Tuple<bool, WebResponse, Exception>> where the first bool indicates if we have a response or an exception. If retryOnError indicates that we should retry for a particular exception the inner observable will throw and that will be picked up by the retry. The SelectMany just unwraps our Tuple and makes the resulting observable be IObservable<WebRequest> again.

See my gist with full source and tests for the final version. Having this operator allows us to write our retry code quite succinctly

Observable.Defer(() => SomApiMethod())
  .RetryWithBackoffStrategy(
     retryCount: 4, 
     retryOnError: e => e is ApiRetryWebException
  )
like image 105
Markus Olsson Avatar answered Nov 23 '22 10:11

Markus Olsson


Maybe I'm over simplifying the situation, but if we look at the implementation of Retry, it is simply an Observable.Catch over an infinite enumerable of observables:

private static IEnumerable<T> RepeatInfinite<T>(T value)
{
    while (true)
        yield return value;
}

public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
{
    return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source));
}

So if we take this approach, we can just add a delay after the first yield.

private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime)
{
    // Don't delay the first time        
    yield return source;

    while (true)
        yield return source.DelaySubscription(dueTime);
    }

public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
{
    return RepeateInfinite(source, dueTime).Catch();
}

An overload that catches a specific exception with a retry count can be even more concise:

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception
{
    return source.Catch<TSource, TException>(exception =>
    {
        if (count <= 0)
        {
            return Observable.Throw<TSource>(exception);
        }

        return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count);
    });
}

Note that the overload here is using recursion. On first appearances, it would seem that a StackOverflowException is possible if count was something like Int32.MaxValue. However, DelaySubscription uses a scheduler to run the subscription action, so stack overflow would not be possible (i.e. using "trampolining"). I guess this isn't really obvious by looking at the code though. We could force a stack overflow by explicitly setting the scheduler in the DelaySubscription overload to Scheduler.Immediate, and passing in TimeSpan.Zero and Int32.MaxValue. We could pass in a non-immediate scheduler to express our intent a little more explicitly, e.g.:

return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count);

UPDATE: Added overload to take in a specific scheduler.

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(
    this IObservable<TSource> source,
    TimeSpan retryDelay,
    int retryCount,
    IScheduler scheduler) where TException : Exception
{
    return source.Catch<TSource, TException>(
        ex =>
        {
            if (retryCount <= 0)
            {
                return Observable.Throw<TSource>(ex);
            }

            return
                source.DelaySubscription(retryDelay, scheduler)
                    .RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler);
        });
} 
like image 21
12 revs Avatar answered Nov 23 '22 10:11

12 revs


Here's the one I'm using:

public static IObservable<T> DelayedRetry<T>(this IObservable<T> src, TimeSpan delay)
{
    Contract.Requires(src != null);
    Contract.Ensures(Contract.Result<IObservable<T>>() != null);

    if (delay == TimeSpan.Zero) return src.Retry();
    return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry());
}
like image 44
Cory Nelson Avatar answered Nov 23 '22 10:11

Cory Nelson


Based on Markus' answer I wrote the following:

public static class ObservableExtensions
{
    private static IObservable<T> BackOffAndRetry<T>(
        this IObservable<T> source,
        Func<int, TimeSpan> strategy,
        Func<int, Exception, bool> retryOnError,
        int attempt)
    {
        return Observable
            .Defer(() =>
            {
                var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt);
                var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay);

                return s
                    .Catch<T, Exception>(e =>
                    {
                        if (retryOnError(attempt, e))
                        {
                            return source.BackOffAndRetry(strategy, retryOnError, attempt + 1);
                        }
                        return Observable.Throw<T>(e);
                    });
            });
    }

    public static IObservable<T> BackOffAndRetry<T>(
        this IObservable<T> source,
        Func<int, TimeSpan> strategy,
        Func<int, Exception, bool> retryOnError)
    {
        return source.BackOffAndRetry(strategy, retryOnError, 0);
    }
}

I like it more because

  • it doesn't modify attempts but uses recursion.
  • It doesn't use retries but passes the number of attempts to retryOnError
like image 27
Johannes Egger Avatar answered Nov 23 '22 12:11

Johannes Egger