I have a scenario where a lot of events can be sent to a stream in a short amount of time. I would like to have an operator that is kind of a mixture of debounceTime
and throttleTime
.
The following demo can be used to illustrate what I would like to have, https://stackblitz.com/edit/rxjs6-demo-jxbght?file=index.ts. I would like the subscriber to get the first emitted event and THEN wait for x ms. If more events were emitted during the waiting time, the last event should be sent to the subscriber after the waiting time. The waiting time should be reset on each new emitted event, just like debounce do.
If you click on the button 3 times within 1 second it should print 1 and 3. If you then click only 1 time within 1 second it should print only 4. If you then click 3 times again it should print 5 and 7.
This does not work with debounceTime
since that doesn't give me the first event and it doesn't work for throttleTime
because that doesn't give me the last emitted value after the wait time is over.
Any suggestions on how to implement this?
UPDATE
I created a custom operator with the help from Martins's answer. Not sure if it is working 100% correctly or if there are better ways to do it but it seems to do what I want it to.
import { Observable, empty } from 'rxjs';
import { exhaustMap, timeoutWith, debounceTime, take, startWith } from 'rxjs/operators';
export function takeFirstThenDebounceTime(waitTime) {
return function takeFirstThenDebounceTimeImplementation(source) {
return Observable.create(subscriber => {
const subscription = source.
pipe(
exhaustMap(val => source.pipe(
timeoutWith(waitTime, empty()),
debounceTime(waitTime),
take(1),
startWith(val)
)),
)
.subscribe(value => {
subscriber.next(value);
},
err => subscriber.error(err),
() => subscriber.complete());
return subscription;
});
}
}
throttleTime emits the source Observable values on the output Observable when its internal timer is disabled, and ignores source values when the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, it is forwarded to the output Observable, and then the timer is enabled.
This operator should give you the first value, the throttled stream and the last value:
export function throunceTime<T>(duration: number): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) =>
merge(source.pipe(throttleTime(duration)), source.pipe(debounceTime(duration)))
.pipe(throttleTime(0, undefined, { leading: true, trailing: false }));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With