I'm working with the latest Angular and Typescript and RxJS 5.
Angular has currently made RxJS a necessity. I've used C# primarily for over 10 years and I'm very much used to Linq/Lambdas/fluent syntax which I assume formed the basis of Reactive.
I would like to make an Http get call with an increasing timeout value on retry, but I'm having a problem seeing how to do that and still keeping everything in the pipeline (not using external state).
I get that I can do this, but it will just retry using the same timeout value.
myHttpObservable.timeout(1000).retry(2);
The documentation for RxJS has been poor in many places and asking about it on here only got my question deleted out of existence, which is sad...so I was forced to look through the source.
Is there a way to retry with an increasing timeout duration each time in a way that keeps state in the pipeline? Also, I want an innitial timeout on the first attempt.
I've tried things similar to this at first, but realized the confusing retryWhen operator is not really intended for what I want:
myHttpObservable.timeout(1000).retryWhen((theSubject: Observable<Error>) => {
return aNewMyObservableCreatedinHere.timeout(2000);
});
I know I could accomplish this using external state, but I'm basically looking for an elegant solution that, I think, is what they are kind of driving for with the reactive style of programming.
I use delayWhen
to create a notifier function that makes retryWhen
emit on increasing delays after each error occurrence. You can choose different time series by changing the formula used to calculate the delay between retries. See the two example formulas below for the geometric and exponential backoff retry strategies. Note also how I limit the number of retries and throw an error if I reach that limit.
const initialDelay = 1000;
const maxRetries = 4;
throwError('oops')
.pipe(
retryWhen(errors =>
errors.pipe(
delayWhen((_,i) => {
// const delay = (i+1)*initialDelay; // geometric
const delay = Math.pow(2,i)*initialDelay; // exponential
console.log(`retrying after ${delay} msec...`);
return timer(delay);
}),
take(maxRetries),
concat(throwError('number of retries exceeded')))))
.subscribe(
x => console.log('value:', x),
e => console.error('error:', e)
);
Based on additional input in the comments of my previous answer I have been experimenting with the .expand()
operator to solve your requirement:
I want to make a get with timeout duration = X and if it times out, then retry with timeout = X+1
The following snippet starts with timeout = 500 and for each attempt the timeout is increased with attempt * 500 until maxAttempts has been reached or the result has been successfully retrieved:
getWithExpandingTimeout('http://reaches-max-attempts', 3)
.subscribe(
res => console.log(res),
err => console.log('ERROR: ' + err.message)
);
getWithExpandingTimeout('http://eventually-finishes-within-timeout', 10)
.subscribe(
res => console.log(res),
err => console.log('ERROR: ' + err.message)
);
/*
retrieve the given url and keep trying with an expanding
timeout until it succeeds or maxAttempts has been reached
*/
function getWithExpandingTimeout(url, maxAttempts) {
return get(url, 1)
.expand(res => {
if(res.error) {
const nextAttempt = 1 + res.attempt;
if(nextAttempt > maxAttempts) {
// too many retry attempts done
return Rx.Observable.throw(new Error(`Max attempts reached to retrieve url ${url}: ${res.error.message}`));
}
// retry next attempt
return get(url, nextAttempt);
}
return Rx.Observable.empty(); // done with retrying, result has been emitted
})
.filter(res => !res.error);
}
/*
retrieve info from server with timeout based on attempt
NOTE: does not errors the stream so expand() keeps working
*/
function get(url, attempt) {
return Rx.Observable.of(`result for ${url} returned after #${attempt} attempts`)
.do(() => console.log(`starting attempt #${attempt} to retrieve ${url}`))
.delay(5 * 500)
.timeout(attempt * 500)
.catch(error => Rx.Observable.of({ attempt: attempt, error }));
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>
Every value that is produced upstream or by the .expand()
operator is emitted downstream AND used as input for the .expand()
operator. This continues until no further values are emitted. By leveraging this behaviour the .get()
function gets re-attempted with an increasing attempt when the emission contains an .error
value inside.
The .get()
function does not throw an Error because otherwise we need to catch it in the .expand()
or the recursion will break unexpected.
When the maxAttempts is exceeded the .expand()
operator throws an Error stopping the recursion. When no .error
property is present in the emission then we expect this to be a succes result and emit an empty Observable, stopping the recursion.
NOTE: It uses .filter()
to remove all emissions based on the .error
property because all values produced by the .expand()
are also emitted downstream but these .error
values are internal state.
There is an operator in backoff-rxjs
npm package to deal with this case called retryBackoff
. I described it in the article at blog.angularindepth.com, but in a nutshell it's this:
source.pipe(
retryWhen(errors => errors.pipe(
concatMap((error, iteration) =>
timer(Math.pow(2, iteration) * initialInterval, maxInterval))));
Here are the sources for the more customizable version.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With