Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: debounceTime return all values

Is it possible to run sequence with delay, if no other events are coming, and return all values at once?

I need some kind of debounceTime func, but that will return all values.

like image 307
pandomic Avatar asked Apr 11 '17 16:04

pandomic


2 Answers

If I understand correctly, you need an operator that buffers events until no event occurs for a certain period of time, then repeats all the buffered events. I would try this:

Set this new operator to the Observable prototype:

function bufferedDebounceTime(time) {
    return Observable.create(subscriber => {
        let buffer = [];
        return this.do(x => buffer.push(x))
            .debounceTime(time)
            .flatMap(() => buffer)
            .do(() => buffer = [])
            .subscribe(
                value => subscriber.next(value),
                err => subscriber.error(err),
                () => subscriber.complete()
            );
    });
}

Observable.prototype.bufferedDebounceTime = bufferedDebounceTime;

Then use it as an operator:

yourSourceObservable.bufferedDebounceTime(1000).subscribe(...)
like image 138
ZahiC Avatar answered Oct 02 '22 23:10

ZahiC


EDIT

bufferTime indeed acts like an interval forever so it's not recommended. As Jørgen Tvedt Commented, debounceTime + buffer it is what you are looking for.

However, because this is a very useful operator. I created My custom one that Makes everything much easier, I called it bufferDebounce

I made it as a OperatorFunction in Typescript to force type inference along the pipeline:

type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;

const bufferDebounce: BufferDebounce = debounce => source =>
  new Observable(observer => 
    source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
      next(x) {
        observer.next(x);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      },
  })
);

You can test it yourself in this working example https://stackblitz.com/edit/rxjs6-buffer-debounce

PREVIOUS ANSWER

With RXJS 6+ you can do this very easily.

As ZahiC mentioned, the answer is with buffers, but specifically you can do all of that with bufferTime How to use bufferTime

So wherever your source is (as observable), you can:

    // This will capture all responses, and return it in an Array every 2 secs
    const example = source.pipe(bufferTime(2000));
like image 22
Ignacio Bustos Avatar answered Oct 03 '22 00:10

Ignacio Bustos