I am trying to write a (generic) function run<ID, ENTITY>(…): Observable<ENTITY>
which takes the following arguments:
init: () => Observable<ID>
which is an initializing request to start a backend process.status: (id: ID) => Observable<ENTITY>
which takes the generated ID and queries the status for it in the backend.repeat: (status: ENTITY) => boolean
which determines whether the status
request must be repeated.initialDelay
and repeatDelay
.So run
should execute init
, then wait for initialDelay
seconds. From now on it should run status
every repeatDelay
seconds until repeat()
returns false
.
However, there are two important things that need to work:
repeatDelay
should only be counted starting when status
has emitted its value as to avoid race conditions if status
takes longer than repeatDelay
status
must also be emitted to the caller.The following (not very pretty) version does everything except for the last thing I mentioned: it doesn't wait for the network response before retrying status
.
run<ID, ENTITY>(…): Observable<ENTITY> {
let finished = false;
return init().mergeMap(id => {
return Observable.timer(initialDelay, repeatDelay)
.switchMap(() => {
if (finished) return Observable.of(null);
return status(id);
})
.takeWhile(response => {
if (repeat(response)) return true;
if (finished) return false;
finished = true;
return true;
});
});
}
My second version is this, which again works for all but one detail: the intermediate values of the status
calls aren't emitted, but I do need them in the caller to show the progress:
run<ID, ENTITY>(…): Observable<ENTITY> {
const loop = id => {
return status(id).switchMap(response => {
return repeat(response)
? Observable.timer(repeatDelay).switchMap(() => loop(id))
: Observable.of(response);
});
};
return init()
.mergeMap(id => Observable.timer(initialDelay).switchMap(() => loop(id)));
}
Admittedly, the latter one also is a bit of a kludge. I'm sure rxjs can solve this problem in a much neater way (and, more importantly, solve it at all), but I can't seem to figure out how.
Update: Observable supports recursion natively with expand
, also shown in @IngoBürk's answer. This lets us write the recursion even more concisely:
function run<ENTITY>(/* ... */): Observable<ENTITY> {
return init().delay(initialDelay).flatMap(id =>
status(id).expand(s =>
repeat(s) ? Observable.of(null).delay(repeatDelay).flatMap(_ => status(id)) : Observable.empty()
)
)
}
Fiddle.
If recursion is acceptable, then you can do things more concisely still:
function run(/* ... */): Observable<ENTITY> {
function recurse(id: number): Observable<ENTITY> {
const status$ = status(id).share();
const tail$ = status$.delay(repeatDelay)
.flatMap(status => repeat(status) ? recurse(id, repeatDelay) : Observable.empty());
return status$.merge(tail$);
}
return init().delay(initialDelay).flatMap(id => recurse(id));
}
Try the fiddle.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With