I'm trying to use RxJS for a simple short poll. It needs to make a request once every delay
seconds to the location path
on the server, ending once one of two conditions are reached: either the callback isComplete(data)
returns true or it has tried the server more than maxTries
. Here's the basic code:
newShortPoll(path, maxTries, delay, isComplete) {
return Observable.interval(delay)
.take(maxTries)
.flatMap((tryNumber) => http.get(path))
.doWhile((data) => !isComplete(data));
}
However, doWhile doesn't exist in RxJS 5.0, so the condition where it can only try the server maxTries
works, thanks to the take() call, but the isComplete
condition does not work. How can I make it so the observable will next() values until isComplete returns true, at which point it will next() that value and complete().
I should note that takeWhile()
does not work for me here. It does not return the last value, which is actually the most important, since that's when we know it's done.
Thanks!
We can create a utility function to create a second Observable that emits every item that the inner Observable emits; however, we will call the onCompleted function once our condition is met:
function takeUntilInclusive(inner$, predicate) {
return Rx.Observable.create(observer => {
var subscription = inner$.subscribe(item => {
observer.onNext(item);
if (predicate(item)) {
observer.onCompleted();
}
}, observer.onError, observer.onCompleted);
return () => {
subscription.dispose();
}
});
}
And here's a quick snippet using our new utility method:
const inner$ = Rx.Observable.range(0, 4);
const data$ = takeUntilInclusive(inner$, (x) => x > 2);
data$.subscribe(x => console.log(x));
// >> 0
// >> 1
// >> 2
// >> 3
This answer is based off: RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after
You can achieve this by using retry and first operators.
// helper observable that can return incomplete/complete data or fail.
var server = Rx.Observable.create(function (observer) {
var x = Math.random();
if(x < 0.1) {
observer.next(true);
} else if (x < 0.5) {
observer.error("error");
} else {
observer.next(false);
}
observer.complete();
return function () {
};
});
function isComplete(data) {
return data;
}
var delay = 1000;
Rx.Observable.interval(delay)
.switchMap(() => {
return server
.do((data) => {
console.log('Server returned ' + data);
}, () => {
console.log('Server threw');
})
.retry(3);
})
.first((data) => isComplete(data))
.subscribe(() => {
console.log('Got completed value');
}, () => {
console.log('Got error');
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With