Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RXJS: throttleTime plus last value

Tags:

rxjs

I have a scenario where a lot of events can be sent to a stream in a short amount of time. I would like to have an operator that is kind of a mixture of debounceTime and throttleTime.

The following demo can be used to illustrate what I would like to have, https://stackblitz.com/edit/rxjs6-demo-jxbght?file=index.ts. I would like the subscriber to get the first emitted event and THEN wait for x ms. If more events were emitted during the waiting time, the last event should be sent to the subscriber after the waiting time. The waiting time should be reset on each new emitted event, just like debounce do.

If you click on the button 3 times within 1 second it should print 1 and 3. If you then click only 1 time within 1 second it should print only 4. If you then click 3 times again it should print 5 and 7.

This does not work with debounceTime since that doesn't give me the first event and it doesn't work for throttleTime because that doesn't give me the last emitted value after the wait time is over.

Any suggestions on how to implement this?

UPDATE

I created a custom operator with the help from Martins's answer. Not sure if it is working 100% correctly or if there are better ways to do it but it seems to do what I want it to.

import { Observable, empty } from 'rxjs';
import { exhaustMap, timeoutWith, debounceTime, take, startWith } from 'rxjs/operators';

export function takeFirstThenDebounceTime(waitTime) {
    return function takeFirstThenDebounceTimeImplementation(source) {
        return Observable.create(subscriber => {
            const subscription = source.
                pipe(
                    exhaustMap(val => source.pipe(
                        timeoutWith(waitTime, empty()),
                        debounceTime(waitTime),
                        take(1),
                        startWith(val)
                    )),
                )
                .subscribe(value => {
                    subscriber.next(value);
                },
                    err => subscriber.error(err),
                    () => subscriber.complete());

            return subscription;
        });
    }
}
like image 257
Abris Avatar asked Aug 27 '18 11:08

Abris


People also ask

What is throttleTime in RXJS?

throttleTime emits the source Observable values on the output Observable when its internal timer is disabled, and ignores source values when the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, it is forwarded to the output Observable, and then the timer is enabled.


1 Answers

This operator should give you the first value, the throttled stream and the last value:

export function throunceTime<T>(duration: number): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) =>
    merge(source.pipe(throttleTime(duration)), source.pipe(debounceTime(duration)))
      .pipe(throttleTime(0, undefined, { leading: true, trailing: false }));
}
like image 105
Alex Rempel Avatar answered Oct 23 '22 15:10

Alex Rempel