Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rxjs observable wait until some condition is met

I have the following retry logic to retry an operation. It works fine for single request. For multiple on going requests, I would like to wait for existing retry logic to complete before retrying.

handleError(errors: Observable<any>) {

    const retryCountStart: number = 1;

    // wait if there is any existing operation retrying
    // once it is complete, continue here

    return errors
        .mergeScan<any, any>(
        (retryCount: any, err: any) => {

            if (retryCount <= 5) {
                return Observable.of(retryCount + 1);
            } 

        },retryCountStart)
        .delay(1000);
}

How can I add delay until some condition is met in the above method?

like image 983
developer Avatar asked Aug 18 '17 10:08

developer


People also ask

Which operator allows you to wait for a defined delay until an item is emitted?

The Delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable's items.

What is pipe operator in RXJS?

A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified. A Pipeable Operator is essentially a pure function which takes one Observable as input and generates another Observable as output.


1 Answers

As i understood you want to start next stream only after previous has completed (i.e. add stream to queue)

import { Observable, of, BehaviorSubject, from } from 'rxjs';
import { tap, finalize, filter, take, switchMap, delay } from 'rxjs/operators';

class StreamQueue {
  lastStreamCompleted$: Observable<boolean> = new BehaviorSubject(true);

  private runAfter<T>(lastStreamCompleted$: Observable<boolean>, stream$: Observable<T>): [Observable<boolean>, Observable<T>] {
    const newLastStreamCompleted$ = new BehaviorSubject(false);
    const newStream$ = lastStreamCompleted$
      .pipe(
        filter(lastStreamCompleted => lastStreamCompleted),
        take(1),
        switchMap(() => stream$),
        finalize(() => newLastStreamCompleted$.next(true)),
    );
    return [newLastStreamCompleted$, newStream$];
  }

  add(stream$: Observable<any>) {
    const [newLastStreamCompleted$, newStream$] = this.runAfter(this.lastStreamCompleted$, stream$);
    this.lastStreamCompleted$ = newLastStreamCompleted$;
    return newStream$;
  }
}

const streamQueue = new StreamQueue();

streamQueue.add(from([1, 2]).pipe(delay(100))).subscribe(console.log);
setTimeout(()=>streamQueue.add(from([21, 22]).pipe(delay(100))).subscribe(console.log), 100);
streamQueue.add(from([11, 12]).pipe(delay(100))).subscribe(console.log);

// Output:
// 1
// 2
// 11
// 12
// 21
// 22
like image 104
Buggy Avatar answered Sep 16 '22 11:09

Buggy