Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx back off and retry

This is based on the code presented in this SO : Write an Rx "RetryAfter" extension method

I am using the code by Markus Olsson (evaluation only at the moment), and before anyone asks I have tried to get hold of Markus on Github, but that is blocked where I work, so I felt the only thing I could do was ask here at SO. Sorry about that, if this sits badly with any one.

So I am using the following code, in a small demo which is this:

class Attempt1
{
    private static bool shouldThrow = true;

    static void Main(string[] args)
    {
        Generate().RetryWithBackoffStrategy(3,
               MyRxExtensions.ExponentialBackoff,
            ex =>
            {
                return ex is NullReferenceException;
            }, Scheduler.TaskPool)
            .Subscribe(
                OnNext,
                OnError
            );

        Console.ReadLine();
    }

    private static void OnNext(int val)
    {
        Console.WriteLine("subscriber value is {0} which was seen on threadId:{1}",
            val, Thread.CurrentThread.ManagedThreadId);
    }

    private static void OnError(Exception ex)
    {
        Console.WriteLine("subscriber bad {0}, which was seen on threadId:{1}",
            ex.GetType(),
            Thread.CurrentThread.ManagedThreadId);
    }

    static IObservable<int> Generate()
    {
        return Observable.Create<int>(
            o =>
            {
                Scheduler.TaskPool.Schedule(() =>
                {
                    if (shouldThrow)
                    {
                        shouldThrow = false;
                        Console.WriteLine("ON ERROR NullReferenceException");
                        o.OnError(new NullReferenceException("Throwing"));
                    }
                    Console.WriteLine("Invoked on threadId:{0}",
                        Thread.CurrentThread.ManagedThreadId);

                    Console.WriteLine("On nexting 1");
                    o.OnNext(1);
                    Console.WriteLine("On nexting 2");
                    o.OnNext(2);
                    Console.WriteLine("On nexting 3");
                    o.OnNext(3);
                    o.OnCompleted();
                    Console.WriteLine("On complete");
                    Console.WriteLine("Finished on threadId:{0}",
                        Thread.CurrentThread.ManagedThreadId);

                });

                return () => { };
            });
    }
}

public static class MyRxExtensions
{
    /// <summary>
    /// An exponential back off strategy which starts with 1 second and then 4, 9, 16...
    /// </summary>
    public static readonly Func<int, TimeSpan>
        ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

    public static IObservable<T> RetryWithBackoffStrategy<T>(
        this IObservable<T> source,
        int retryCount = 3,
        Func<int, TimeSpan> strategy = null,
        Func<Exception, bool> retryOnError = null,
        IScheduler scheduler = null)
    {
        strategy = strategy ?? MyRxExtensions.ExponentialBackoff;

        int attempt = 0;

        return Observable.Defer(() =>
        {
            return ((++attempt == 1) ? source : source.Delay(strategy(attempt - 1), scheduler))
                .Select(item => new Tuple<bool, T, Exception>(true, item, null))
                .Catch<Tuple<bool, T, Exception>, Exception>(e =>
                    retryOnError(e)
                    ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                    : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
        })
        .Retry(retryCount)
        .SelectMany(t => t.Item1
            ? Observable.Return(t.Item2)
            : Observable.Throw<T>(t.Item3));
    }
}

The code makes sense to me, we try and do an operation if that fails we back off and retry. The exception type that we want to retry can be specified, and we also see that the subscriber only sees the final values once after retry which works (in the demo code above the Exception is only done (OnError'd the first time)).

So generally the code works as expected except for one thing.

If I look at the output the code above produces I get this:

ON ERROR NullReferenceException 
Invoked on threadId:10 
On nexting 1
Invoked on threadId:11 
On nexting 1 
On nexting 2 
On nexting 3 
On complete 
Finished on threadId:10 
On nexting 2 
On nexting 3 
On complete 
Finished on threadId:11 
subscriber value is 1 which was seen on threadId:10 
subscriber value is 2 which was seen on threadId:10
subscriber value is 3 which was seen on threadId:10

The interesting thing for me here is that the subscriber values all come in one go, I would have expected that when the OnNext within the Generate() method were called that the Subscribers OnNext would write the OnNext'ed value to the Console output.

Could anyone shed any light on why this might be?

like image 253
sacha barber Avatar asked Nov 25 '13 09:11

sacha barber


People also ask

What is retry RxJava?

RxJava 1․ Another variant of retry takes a single parameter: a count of the number of times it should try to resubscribe to the source Observable when it encounters errors. If this count is exceeded, retry will not attempt to resubscribe again and will instead pass the latest onError notification to its observers.

What is backoff strategy?

Basically, a backoff strategy is a technique that we can use to retry failing function calls after a given delay - and keep retrying them until either the function call works, or until we've tried so many times that we just give up and handle the error.

What is retry jitter?

If errors are caused by load, retries can be ineffective if all clients retry at the same time. To avoid this problem, we employ jitter. This is a random amount of time before making or retrying a request to help prevent large bursts by spreading out the arrival rate.

How does exponential backoff work?

An exponential backoff algorithm retries requests exponentially, increasing the waiting time between retries up to a maximum backoff time. For example: Make a request to Cloud IoT Core. If the request fails, wait 1 + random_number_milliseconds seconds and retry the request.


2 Answers

For anyone else that happens upon this post, it was indeed fixed by suggestions made by James World, and Brandon (thanks chaps).

Here is full working code

class Attempt1
{
    private static bool shouldThrow = true;

    static void Main(string[] args)
    {
        Generate().RetryWithBackoffStrategy(3, MyRxExtensions.ExponentialBackoff,
            ex =>
            {
                return ex is NullReferenceException;
            }, Scheduler.TaskPool)
            .Subscribe(
                OnNext,
                OnError
            );

        Console.ReadLine();
    }

    private static void OnNext(int val)
    {
        Console.WriteLine("subscriber value is {0} which was seen on threadId:{1}", 
            val, Thread.CurrentThread.ManagedThreadId);
    }

    private static void OnError(Exception ex)
    {
        Console.WriteLine("subscriber bad {0}, which was seen on threadId:{1}", 
            ex.GetType(),
            Thread.CurrentThread.ManagedThreadId);
    }

    static IObservable<int> Generate()
    {
        return Observable.Create<int>(
            o =>
            {
                Scheduler.TaskPool.Schedule(() =>
                    {
                        if (shouldThrow)
                        {
                            shouldThrow = false;
                            Console.WriteLine("ON ERROR NullReferenceException");
                            o.OnError(new NullReferenceException("Throwing"));
                        }
                        Console.WriteLine("Invoked on threadId:{0}", 
                            Thread.CurrentThread.ManagedThreadId);

                        Console.WriteLine("On nexting 1");
                        o.OnNext(1);
                        Console.WriteLine("On nexting 2");
                        o.OnNext(2);
                        Console.WriteLine("On nexting 3");
                        o.OnNext(3);
                        o.OnCompleted();
                        Console.WriteLine("On complete");
                        Console.WriteLine("Finished on threadId:{0}", 
                            Thread.CurrentThread.ManagedThreadId);
                    });

                return () => { };
            });
    }
}

public static class MyRxExtensions
{
    /// <summary>
    /// An exponential back off strategy which starts with 1 second and then 4, 9, 16...
    /// </summary>
    public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

    public static IObservable<T> RetryWithBackoffStrategy<T>(
        this IObservable<T> source,
        int retryCount = 3,
        Func<int, TimeSpan> strategy = null,
        Func<Exception, bool> retryOnError = null,
        IScheduler scheduler = null)
    {
        strategy = strategy ?? MyRxExtensions.ExponentialBackoff;

        int attempt = 0;

        return Observable.Defer(() =>
        {
            return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
                .Select(item => new Tuple<bool, T, Exception>(true, item, null))
                .Catch<Tuple<bool, T, Exception>, Exception>(e =>
                    retryOnError(e)
                    ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                    : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
        })
        .Retry(retryCount)
        .SelectMany(t => t.Item1
            ? Observable.Return(t.Item2)
            : Observable.Throw<T>(t.Item3));
    }

    public static IObservable<T> DelaySubscription<T>(this IObservable<T> source, 
        TimeSpan delay, IScheduler scheduler = null)
    {
        if (scheduler == null)
        {
            return Observable.Timer(delay).SelectMany(_ => source);
        }
        return Observable.Timer(delay, scheduler).SelectMany(_ => source);
    }
}

Which produces the desired output of

ON ERROR NullReferenceException
Invoked on threadId:11
On nexting 1
On nexting 2
On nexting 3
On complete
Finished on threadId:11
Invoked on threadId:11
On nexting 1
subscriber value is 1 which was seen on threadId:11
On nexting 2
subscriber value is 2 which was seen on threadId:11
On nexting 3
subscriber value is 3 which was seen on threadId:11
On complete
Finished on threadId:11
like image 190
sacha barber Avatar answered Oct 03 '22 12:10

sacha barber


It's because you are putting a Delay on the result stream. (The value for n passed to ExponentialBackoff on the second iteration is 1, giving a delay of 1 second.)

Delay operates on source, but the source proceeds as normal. Delay schedules the results it receives to be emitted after the specified duration. So the subscriber is getting the results after the logic of Generate has run to completion.

If you think about it this is how Delay must be - otherwise Delay would be able to somehow interfere with upstream operators!

It is possible to interfere with upstream operators (without throwing exceptions), by being a slow consumer. But that would certainly be a very bad way for a simple Delay to behave.

I don't think the Delay is what you intend here - because Delay doesn't delay it's subscription. If you use DelaySubscription instead, you'll get what you're after I think. This is what's used in the linked question too.

Your question provides a great illustration of the difference between Delay and DelaySubscription! It's worth thinking about Defer in here too.

The distinction between these three is subtle but significant, so let's summarize all three:

  • Delay - Calls target operator immediately to get an IObservable, on its Subscribe calls Subscribe on target immediately, schedules events for delivery after specified delay on the specified Scheduler.

  • DelaySubscription - Calls target operator immediately to get an IObservable. On its Subscribe schedules Subscribe on target for execution after specified delay on the specified Scheduler.

  • Defer - Has no target operator. On Subscribe runs provided factory function to get target IObservable and immediately calls Subscribe. There's no delay added, hence no Scheduler to specify.

like image 20
James World Avatar answered Oct 03 '22 14:10

James World