I have a use case where I need an Observable to skip its next emission whenever another notifier Observable emits.
source: |---X---X---X---X---X---X---X---X---X---X--|>
notifier: |-------------X---------X----------X-------|>
result: |---X---X---X-------X---X-------X-------X--|>
Basically, I want an operator called skipNextWhen
that takes in the notifier observable and skips the next emission from the source.
I tried using an implementation that uses the pausable
operator (re-implemented using switchMap
), but couldn't get it to work.
pausable.ts
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/observable/never';
import 'rxjs/add/operator/startWith';
declare module 'rxjs/Observable' {
interface Observable<T> {
pausable: typeof pausable;
}
}
function pausable<T>(notifier: Observable<boolean>): Observable<T> {
return notifier.startWith(false).switchMap((paused) => {
if (paused) {
return Observable.never();
} else {
const source = new Subject();
this.subscribe(source);
return source;
}
});
}
Observable.prototype.pausable = pausable;
skipNextWhen.ts
import { Observable } from 'rxjs/Observable';
import './pausable';
declare module 'rxjs/Observable' {
interface Observable<T> {
skipNextWhen: typeof skipNextWhen;
}
}
function skipNextWhen<T, R>(other: Observable<T>): Observable<R> {
const notifier = Observable.merge(this.map(() => false),
other.map(() => true));
return this.pausable(notifier);
}
Observable.prototype.skipNextWhen = skipNextWhen;
Is there a more suitable operator that I should consider using instead? The behavior I'm seeing with my current implementation is that the result Observable emits once, and then never again - even if the notifier Observable never emits.
I've started a (very) small library of some rxjs utils I've wanted. It happens to have a function to do exactly what you ask: skipAfter
. From the docs:
source: -1-----2-----3-----4-----5-|
skip$: ----0----------0-0----------
result: -1-----------3-----------5-|
The library is here: https://github.com/simontonsoftware/s-rxjs-utils
I can think of two solutions to this:
Using .filter()
, .do()
and a few side-effects.
This is mayne easier to understand solution even though it's not that "Rx" way:
function skipNextWhen(other) {
let skipNext = false;
return this.merge(other.do(() => skipNext = true).filter(() => false))
.filter(val => {
const doSkip = skipNext;
skipNext = false;
return !doSkip;
});
}
I'm using merge()
just to update skipNext
, other
's value is always ignored.
Using .scan()
:
This solution is without any state variables and side-effects.
function skipNextWhen(other) {
const SKIP = 'skip';
return this.merge(other.mapTo(SKIP))
.scan((acc, val) => {
if (acc === SKIP) {
return null;
} else if (val === SKIP) {
return SKIP;
} else {
return val;
}
}, [])
.filter(val => Boolean(val) && val !== SKIP);
}
Basically, when SKIP
arrives I return it right away because it's going to be passed again in acc
parameter by the scan()
operator and later ignored by filter()
.
If I receive a normal value but the previous value was SKIP
I ignore it and return just null
which is later filter away.
Both solutions give the same result:
Observable.prototype.skipNextWhen = skipNextWhen;
const source = Observable.range(1, 10)
.concatMap(val => Observable.of(val).delay(100));
source
.skipNextWhen(Observable.interval(350))
.subscribe(console.log);
This prints the following:
1
2
3
5
6
8
9
10
Just be aware that you're not in fact creating new operator. You just have a shortcut for an operator chain. This for example doesn't let you unsubscribe from other
when the source completes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With