Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rate limiting observables

Tags:

rx-java

I have an observable that produces data from a fast stream from a database cursor. I'm looking to throttle the output on a rate of x items per second. So far I've been using Callstack blocking as described on the docs:

observable.map(f -> {
ratelimiter.acquire(); // configured limiter to only allow
});

This is working fine, but just out of curiosity is there a better way to handle this using backpressure?

Tks

like image 973
Vinicius Carvalho Avatar asked Nov 14 '14 15:11

Vinicius Carvalho


2 Answers

Use sample (throttleLast) operator:

Observable<T> throttled = 
    observable.sample(1 / rate, TimeUnit.MILLISECONDS);

http://reactivex.io/documentation/operators/sample.html

https://github.com/ReactiveX/RxJava/wiki/Backpressure

like image 129
Paweł Krupiński Avatar answered Dec 16 '22 18:12

Paweł Krupiński


You could try using rx.Observable#onBackpressureBuffer() combined with a custom subscriber that will periodically request n items per second. But, you would be bound to hard one second sampling.

Note .subscribeOn() and .toBlocking() is only to make the main method not exit immediately.

public class BackpressureTest {

  public static void main(final String[] args) {
    Observable.range(1, 1000)
      .compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it
      .lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second
      .subscribeOn(Schedulers.computation())
      .toBlocking()
      .subscribe(System.out::println);
  }

  private static <T> Observable.Operator<T, T> allowPerSecond(final int n) {
    return upstream -> periodicallyRequestingSubscriber(upstream, n);
  }

  private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) {
    return new Subscriber<T>() {

      @Override
      public void onStart() {
        request(0); // request 0 so that source stops emitting
        Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items
      }

      @Override
      public void onCompleted() {
        upstream.onCompleted();
      }

      @Override
      public void onError(final Throwable e) {
        upstream.onError(e);
      }

      @Override
      public void onNext(final T integer) {
        upstream.onNext(integer);
      }
    };
  }
}
like image 41
michalsamek Avatar answered Dec 16 '22 17:12

michalsamek