Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS 5.0 "do while" like mechanism

I'm trying to use RxJS for a simple short poll. It needs to make a request once every delay seconds to the location path on the server, ending once one of two conditions are reached: either the callback isComplete(data) returns true or it has tried the server more than maxTries. Here's the basic code:

newShortPoll(path, maxTries, delay, isComplete) {
    return Observable.interval(delay)
    .take(maxTries)
    .flatMap((tryNumber) => http.get(path))
    .doWhile((data) => !isComplete(data));
  }

However, doWhile doesn't exist in RxJS 5.0, so the condition where it can only try the server maxTries works, thanks to the take() call, but the isComplete condition does not work. How can I make it so the observable will next() values until isComplete returns true, at which point it will next() that value and complete().

I should note that takeWhile() does not work for me here. It does not return the last value, which is actually the most important, since that's when we know it's done.

Thanks!

like image 504
Colton Voege Avatar asked Mar 02 '16 20:03

Colton Voege


2 Answers

We can create a utility function to create a second Observable that emits every item that the inner Observable emits; however, we will call the onCompleted function once our condition is met:

function takeUntilInclusive(inner$, predicate) {
    return Rx.Observable.create(observer => {
        var subscription = inner$.subscribe(item => {
            observer.onNext(item);

            if (predicate(item)) {
                observer.onCompleted();
            }
        }, observer.onError, observer.onCompleted);


        return () => {
            subscription.dispose();
        }
    });
}

And here's a quick snippet using our new utility method:

const inner$ = Rx.Observable.range(0, 4);
const data$ = takeUntilInclusive(inner$, (x) => x > 2);
data$.subscribe(x => console.log(x));

// >> 0
// >> 1
// >> 2
// >> 3

This answer is based off: RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

like image 52
Calvin Belden Avatar answered Oct 05 '22 08:10

Calvin Belden


You can achieve this by using retry and first operators.

// helper observable that can return incomplete/complete data or fail.
var server = Rx.Observable.create(function (observer) {
  var x = Math.random();

  if(x < 0.1) {
    observer.next(true);
  } else if (x < 0.5) {
    observer.error("error");
  } else {
    observer.next(false);
  }
  observer.complete();

  return function () {
  };
});
   
function isComplete(data) {
  return data;
}
  
var delay = 1000;
Rx.Observable.interval(delay)
  .switchMap(() => {
    return server
      .do((data) => {
        console.log('Server returned ' + data);
      }, () => {
        console.log('Server threw');
      })
      .retry(3);
  })
  .first((data) => isComplete(data))
  .subscribe(() => {
    console.log('Got completed value');
  }, () => {
    console.log('Got error');
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
like image 22
Sergey Sokolov Avatar answered Oct 05 '22 09:10

Sergey Sokolov