I am trying to write a (generic) function run<ID, ENTITY>(…): Observable<ENTITY> which takes the following arguments:
init: () => Observable<ID> which is an initializing request to start a backend process.status: (id: ID) => Observable<ENTITY> which takes the generated ID and queries the status for it in the backend.repeat: (status: ENTITY) => boolean which determines whether the status request must be repeated.initialDelay and repeatDelay.So run should execute init, then wait for initialDelay seconds. From now on it should run status every repeatDelay seconds until repeat() returns false.
However, there are two important things that need to work:
repeatDelay should only be counted starting when status has emitted its value as to avoid race conditions if status takes longer than repeatDelay
status must also be emitted to the caller.The following (not very pretty) version does everything except for the last thing I mentioned: it doesn't wait for the network response before retrying status.
run<ID, ENTITY>(…): Observable<ENTITY> {
let finished = false;
return init().mergeMap(id => {
return Observable.timer(initialDelay, repeatDelay)
.switchMap(() => {
if (finished) return Observable.of(null);
return status(id);
})
.takeWhile(response => {
if (repeat(response)) return true;
if (finished) return false;
finished = true;
return true;
});
});
}
My second version is this, which again works for all but one detail: the intermediate values of the status calls aren't emitted, but I do need them in the caller to show the progress:
run<ID, ENTITY>(…): Observable<ENTITY> {
const loop = id => {
return status(id).switchMap(response => {
return repeat(response)
? Observable.timer(repeatDelay).switchMap(() => loop(id))
: Observable.of(response);
});
};
return init()
.mergeMap(id => Observable.timer(initialDelay).switchMap(() => loop(id)));
}
Admittedly, the latter one also is a bit of a kludge. I'm sure rxjs can solve this problem in a much neater way (and, more importantly, solve it at all), but I can't seem to figure out how.
Update: Observable supports recursion natively with expand, also shown in @IngoBürk's answer. This lets us write the recursion even more concisely:
function run<ENTITY>(/* ... */): Observable<ENTITY> {
return init().delay(initialDelay).flatMap(id =>
status(id).expand(s =>
repeat(s) ? Observable.of(null).delay(repeatDelay).flatMap(_ => status(id)) : Observable.empty()
)
)
}
Fiddle.
If recursion is acceptable, then you can do things more concisely still:
function run(/* ... */): Observable<ENTITY> {
function recurse(id: number): Observable<ENTITY> {
const status$ = status(id).share();
const tail$ = status$.delay(repeatDelay)
.flatMap(status => repeat(status) ? recurse(id, repeatDelay) : Observable.empty());
return status$.merge(tail$);
}
return init().delay(initialDelay).flatMap(id => recurse(id));
}
Try the fiddle.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With