I need an operator similar to exahustMap
, but which remembers the last skipped observable and executes it after the current observable completes.
For example, considering the marble diagram for exhaustMap
:
In my case, after the blue values are emitted, it will be followed by three values of 50. Of course, in this case it looks just like concatMap
, but if there were also a 4 between 3 and 5, it would not reflect in the output.
I've managed to write my own operator similar to how exhaustMap
is implemented:
function exhaustLatestMap<T, R>(project: (value: T) => Subscribable<R>): OperatorFunction<T, R> {
return source => new Observable<R>(observer =>
source.subscribe(new ExhaustLatestMapOperatorSubscriber(observer, project)));
}
class ExhaustLatestMapOperatorSubscriber<T, R> implements Observer<T> {
constructor(
private observer: Subscriber<R>,
private project: (value: T) => Subscribable<R>) { }
innerSub: AnonymousSubscription = null;
latestValue: T;
next(value: T) {
this.processNext(value);
}
error(err) {
this.observer.error(err);
}
complete() {
this.observer.complete();
}
private processNext(value: T) {
this.latestValue = value;
if (!this.innerSub) {
this.innerSub = this.project(value).subscribe({
next: v => this.observer.next(v),
error: err => {
this.observer.error(err);
this.endInnerSub(value)
},
complete: () => {
this.endInnerSub(value);
}
});
}
}
private endInnerSub(value: T) {
this.innerSub.unsubscribe();
this.innerSub = null;
if (this.latestValue !== value) {
this.processNext(this.latestValue);
}
}
}
But I was wondering if there is a way to implement it just by reusing and combining existing operators. Any ideas?
It is possible to implement it using just the built-in factories and operators. However, AFAICT, it cannot be done without managing some per-subscription state.
Fortunately, the defer
factory function makes managing per-subscription state relatively straightforward and safe. And, in addition to helping manage per-subscription state, defer
can be used as a mechanism for being notified of when an observable is subscribed to.
An alternative implementation:
const {
concat,
defer,
EMPTY,
merge,
of
} = rxjs;
const {
delay,
mergeMap,
tap
} = rxjs.operators;
const exhaustMapLatest = project => source => defer(() => {
let latestValue;
let hasLatestValue = false;
let isExhausting = false;
const next = value => defer(() => {
if (isExhausting) {
latestValue = value;
hasLatestValue = true;
return EMPTY;
}
hasLatestValue = false;
isExhausting = true;
return project(value).pipe(
tap({ complete: () => isExhausting = false }),
s => concat(s, defer(() => hasLatestValue ?
next(latestValue) :
EMPTY
))
);
});
return source.pipe(mergeMap(next));
});
const source = merge(
of(0).pipe(delay(0)),
of(1000).pipe(delay(1000)),
of(1100).pipe(delay(1100)),
of(1200).pipe(delay(1200)),
of(2000).pipe(delay(2000))
);
source.pipe(
exhaustMapLatest(value => merge(
of(`${value}:0`).pipe(delay(0)),
of(`${value}:150`).pipe(delay(150)),
of(`${value}:300`).pipe(delay(300))
))
).subscribe(value => console.log(value));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
There are a couple of behavioural differences between this implementation and yours:
hasLatestValue
flag rather than an equality check, so if the latest value is equal to the initial value, it is still projected.I'm not advocating that it should be implemented this way. The answer is just to show an alternative implementation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With