Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Repeat request until condition is met and return intermediate values

Tags:

rxjs

rxjs5

I am trying to write a (generic) function run<ID, ENTITY>(…): Observable<ENTITY> which takes the following arguments:

  • A function init: () => Observable<ID> which is an initializing request to start a backend process.
  • A function status: (id: ID) => Observable<ENTITY> which takes the generated ID and queries the status for it in the backend.
  • A function repeat: (status: ENTITY) => boolean which determines whether the status request must be repeated.
  • Two integer values initialDelay and repeatDelay.

So run should execute init, then wait for initialDelay seconds. From now on it should run status every repeatDelay seconds until repeat() returns false.

However, there are two important things that need to work:

  • repeatDelay should only be counted starting when status has emitted its value as to avoid race conditions if status takes longer than repeatDelay
  • The intermediate values emitted by the calls to status must also be emitted to the caller.

The following (not very pretty) version does everything except for the last thing I mentioned: it doesn't wait for the network response before retrying status.

run<ID, ENTITY>(…): Observable<ENTITY> {
    let finished = false;
    return init().mergeMap(id => {
        return Observable.timer(initialDelay, repeatDelay)
            .switchMap(() => {
                if (finished) return Observable.of(null);
                return status(id);
            })
            .takeWhile(response => {
                if (repeat(response)) return true;
                if (finished) return false;

                finished = true;
                return true;
            });
    });
}

My second version is this, which again works for all but one detail: the intermediate values of the status calls aren't emitted, but I do need them in the caller to show the progress:

run<ID, ENTITY>(…): Observable<ENTITY> {
    const loop = id => {
        return status(id).switchMap(response => {
            return repeat(response)
                ? Observable.timer(repeatDelay).switchMap(() => loop(id))
                : Observable.of(response);
        });
    };

    return init()
        .mergeMap(id => Observable.timer(initialDelay).switchMap(() => loop(id)));
}

Admittedly, the latter one also is a bit of a kludge. I'm sure rxjs can solve this problem in a much neater way (and, more importantly, solve it at all), but I can't seem to figure out how.

like image 696
Ingo Bürk Avatar asked Nov 07 '17 12:11

Ingo Bürk


1 Answers

Update: Observable supports recursion natively with expand, also shown in @IngoBürk's answer. This lets us write the recursion even more concisely:

function run<ENTITY>(/* ... */): Observable<ENTITY> {
  return init().delay(initialDelay).flatMap(id =>
    status(id).expand(s => 
      repeat(s) ? Observable.of(null).delay(repeatDelay).flatMap(_ => status(id)) : Observable.empty()
    )
  )
}

Fiddle.


If recursion is acceptable, then you can do things more concisely still:

function run(/* ... */): Observable<ENTITY> {
  function recurse(id: number): Observable<ENTITY> {
    const status$ = status(id).share();
    const tail$ = status$.delay(repeatDelay)
                         .flatMap(status => repeat(status) ? recurse(id, repeatDelay) : Observable.empty());
    return status$.merge(tail$);
  }
  return init().delay(initialDelay).flatMap(id => recurse(id));
}

Try the fiddle.

like image 54
concat Avatar answered Oct 10 '22 00:10

concat