Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I adjust timeout duration on retry using RxJS?

I'm working with the latest Angular and Typescript and RxJS 5.

Angular has currently made RxJS a necessity. I've used C# primarily for over 10 years and I'm very much used to Linq/Lambdas/fluent syntax which I assume formed the basis of Reactive.

I would like to make an Http get call with an increasing timeout value on retry, but I'm having a problem seeing how to do that and still keeping everything in the pipeline (not using external state).

I get that I can do this, but it will just retry using the same timeout value.

myHttpObservable.timeout(1000).retry(2);

The documentation for RxJS has been poor in many places and asking about it on here only got my question deleted out of existence, which is sad...so I was forced to look through the source.

Is there a way to retry with an increasing timeout duration each time in a way that keeps state in the pipeline? Also, I want an innitial timeout on the first attempt.

I've tried things similar to this at first, but realized the confusing retryWhen operator is not really intended for what I want:

myHttpObservable.timeout(1000).retryWhen((theSubject: Observable<Error>) => {
 return  aNewMyObservableCreatedinHere.timeout(2000);  
});

I know I could accomplish this using external state, but I'm basically looking for an elegant solution that, I think, is what they are kind of driving for with the reactive style of programming.

like image 813
Brandon Avatar asked Jan 26 '17 11:01

Brandon


3 Answers

I use delayWhen to create a notifier function that makes retryWhen emit on increasing delays after each error occurrence. You can choose different time series by changing the formula used to calculate the delay between retries. See the two example formulas below for the geometric and exponential backoff retry strategies. Note also how I limit the number of retries and throw an error if I reach that limit.

const initialDelay = 1000;
const maxRetries = 4;
throwError('oops')
  .pipe(
    retryWhen(errors =>
      errors.pipe(
        delayWhen((_,i) => {
          // const delay = (i+1)*initialDelay;            // geometric
          const delay = Math.pow(2,i)*initialDelay;       // exponential
          console.log(`retrying after ${delay} msec...`);
          return timer(delay);
        }),
        take(maxRetries),
        concat(throwError('number of retries exceeded')))))
  .subscribe(
    x => console.log('value:', x),
    e => console.error('error:', e)
  );
like image 169
José Carballosa Avatar answered Nov 20 '22 22:11

José Carballosa


Based on additional input in the comments of my previous answer I have been experimenting with the .expand() operator to solve your requirement:

I want to make a get with timeout duration = X and if it times out, then retry with timeout = X+1

The following snippet starts with timeout = 500 and for each attempt the timeout is increased with attempt * 500 until maxAttempts has been reached or the result has been successfully retrieved:

getWithExpandingTimeout('http://reaches-max-attempts', 3)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );

getWithExpandingTimeout('http://eventually-finishes-within-timeout', 10)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );



/*
retrieve the given url and keep trying with an expanding 
timeout until it succeeds or maxAttempts has been reached
*/
function getWithExpandingTimeout(url, maxAttempts) {
  return get(url, 1)
    .expand(res => {
      if(res.error) {
        const nextAttempt = 1 + res.attempt;
        if(nextAttempt > maxAttempts) {
          // too many retry attempts done
          return Rx.Observable.throw(new Error(`Max attempts reached to retrieve url ${url}: ${res.error.message}`));
        }
        
        // retry next attempt
        return get(url, nextAttempt);
      }
      return Rx.Observable.empty(); // done with retrying, result has been emitted
    })
    .filter(res => !res.error);
}

/*
 retrieve info from server with timeout based on attempt
 NOTE: does not errors the stream so expand() keeps working
*/
function get(url, attempt) {
  return Rx.Observable.of(`result for ${url} returned after #${attempt} attempts`)
    .do(() => console.log(`starting attempt #${attempt} to retrieve ${url}`))
    .delay(5 * 500)
    .timeout(attempt * 500)
    .catch(error => Rx.Observable.of({ attempt: attempt, error }));
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

How does this work

Every value that is produced upstream or by the .expand() operator is emitted downstream AND used as input for the .expand() operator. This continues until no further values are emitted. By leveraging this behaviour the .get() function gets re-attempted with an increasing attempt when the emission contains an .error value inside.

The .get() function does not throw an Error because otherwise we need to catch it in the .expand() or the recursion will break unexpected.

When the maxAttempts is exceeded the .expand() operator throws an Error stopping the recursion. When no .error property is present in the emission then we expect this to be a succes result and emit an empty Observable, stopping the recursion.

NOTE: It uses .filter() to remove all emissions based on the .error property because all values produced by the .expand() are also emitted downstream but these .error values are internal state.

like image 32
Mark van Straten Avatar answered Nov 20 '22 22:11

Mark van Straten


There is an operator in backoff-rxjs npm package to deal with this case called retryBackoff. I described it in the article at blog.angularindepth.com, but in a nutshell it's this:

source.pipe(
  retryWhen(errors => errors.pipe(
      concatMap((error, iteration) => 
          timer(Math.pow(2, iteration) * initialInterval, maxInterval))));

Here are the sources for the more customizable version.

like image 7
Alex Okrushko Avatar answered Nov 20 '22 21:11

Alex Okrushko