Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS throttle behavior; get first value immediately

Example Plunkr: https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview

I am aware that there are these 3 throttle methods for RxJS (5.0 beta.4):

auditTime(), throttleTime() and debounceTime()

The behavior I am looking for is the one lodash does by default on throttle:

    1. Give me the first value immediately!
    1. on consecutive values, hold values for the given delay, then emit last occurred value
    1. when throttle delay expired, go back to state (1)

In theory this should look like:

inputObservable
  .do(() => cancelPreviousRequest())
  .throttleTime(500)
  .subscribe((value) => doNextRequest(value))

But

  • throttleTime never gives me the last value, if emitted in the throttle timeout
  • debounceTime doesn't trigger immediately
  • auditTime doesn't trigger immediately

Could I combine any of the RxJS methods to achieve the described behavior?

like image 845
sod Avatar asked May 10 '16 18:05

sod


3 Answers

For anyone looking for this after 2018: This has been added over a year ago, but for some reason, the documentation has not been updated yet.

RxJS commit

You can simply pass a config object to throttleTime. The default is { leading: true, trailing: false }. To achieve the behavior discussed here you simply have to set trailing to true: { leading: true, trailing: true }

EDIT:

For completeness, here is a working snippet:

import { asyncScheduler } from 'rxjs'
import { throttleTime } from 'rxjs/operators'

...

observable.pipe(
  throttleTime(100, asyncScheduler, { leading: true, trailing: true })
)
like image 196
Andreas Gassmann Avatar answered Nov 09 '22 06:11

Andreas Gassmann


For older RxJs, I wrote a concatLatest operator that does most of what you want. With it, you could get your throttling behavior with this code:

const delay = Rx.Observable.empty().delay(500);
inputObservable
    .map(value => Rx.Observable.of(value).concat(delay))
    .concatLatest()
    .subscribe(...);

Here's the operator. I took a stab at updating it to work with RxJS5:

Rx.Observable.prototype.concatLatest = function () {
    /// <summary>
    /// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
    /// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
    /// away and only the Nth will be observed.
    /// </summary>
    /// <returns type="Rx.Observable"></returns>
    var source = this;

    return Rx.Observable.create(function (observer) {
        var latest,
            isStopped,
            isBusy,
            outerSubscription,
            innerSubscription,
            subscriptions = new Rx.Subscription(function () {
              if (outerSubscription) {
                outerSubscription.unsubscribe();
              }
              if (innerSubscription) {
                innerSubscription.unsubscribe();
              }
            }),
            onError = observer.error.bind(observer),
            onNext = observer.next.bind(observer),
            innerOnComplete = function () {
                var inner = latest;
                if (inner) {
                    latest = undefined;
                    if (innerSubscription) {
                      innerSubscription.unsubscribe();
                    }
                    innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
                }
                else {
                    isBusy = false;
                    if (isStopped) {
                        observer.complete();
                    }
                }
            };

        outerSubscription = source.subscribe(function (newInner) {
            if (isBusy) {
                latest = newInner;
            }
            else {
                isBusy = true;
                if (innerSubscription) {
                  innerSubscription.unsubscribe();
                }
                innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
            }
        }, onError, function () {
            isStopped = true;
            if (!isBusy) {
                observer.complete();
            }
        });

        return subscriptions;
    });
};

And here's an updated plunkr: https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview

Note I updated your lodash version to the latest version. In lodash 4.7, I rewrote the throttle/debounce operators to fix some edge case bugs. You were using 4.6.1 which still had some of those bugs, though I don't think they were affecting your test.

like image 3
Brandon Avatar answered Nov 09 '22 05:11

Brandon


I took the auditTime operator and changed 2 lines to achieve the desired behavior.

New plunker: https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview

Original:

  • https://github.com/ReactiveX/rxjs/blob/master/src/operator/auditTime.ts

Changes:

from (auditTime):

protected _next(value: T): void {
  this.value = value;
  this.hasValue = true;
  if (!this.throttled) {
    this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
  }
}

clearThrottle(): void {
  const { value, hasValue, throttled } = this;
  if (throttled) {
    this.remove(throttled);
    this.throttled = null;
    throttled.unsubscribe();
  }
  if (hasValue) {
    this.value = null;
    this.hasValue = false;
    this.destination.next(value);
  }
}

to (auditTimeImmediate):

protected _next(value: T): void {
    this.value = value;
    this.hasValue = true;
    if (!this.throttled) {
        // change 1:
        this.clearThrottle();
    }
}

clearThrottle(): void {
    const { value, hasValue, throttled } = this;
    if (throttled) {
        this.remove(throttled);
        this.throttled = null;
        throttled.unsubscribe();
    }
    if (hasValue) {
        this.value = null;
        this.hasValue = false;
        this.destination.next(value);
        // change 2:
        this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
    }
}

So I start the timeout after the value was nexted.

Usage:

inputObservable
  .do(() => cancelPreviousRequest())
  .auditTimeImmediate(500)
  .subscribe((value) => doNextRequest(value))
like image 2
sod Avatar answered Nov 09 '22 04:11

sod