I have an observable that produces data from a fast stream from a database cursor. I'm looking to throttle the output on a rate of x items per second. So far I've been using Callstack blocking as described on the docs:
observable.map(f -> {
ratelimiter.acquire(); // configured limiter to only allow
});
This is working fine, but just out of curiosity is there a better way to handle this using backpressure?
Tks
Use sample
(throttleLast) operator:
Observable<T> throttled =
observable.sample(1 / rate, TimeUnit.MILLISECONDS);
http://reactivex.io/documentation/operators/sample.html
https://github.com/ReactiveX/RxJava/wiki/Backpressure
You could try using rx.Observable#onBackpressureBuffer()
combined with a custom subscriber that will periodically request n
items per second. But, you would be bound to hard one second sampling.
Note .subscribeOn()
and .toBlocking()
is only to make the main method not exit immediately.
public class BackpressureTest {
public static void main(final String[] args) {
Observable.range(1, 1000)
.compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it
.lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second
.subscribeOn(Schedulers.computation())
.toBlocking()
.subscribe(System.out::println);
}
private static <T> Observable.Operator<T, T> allowPerSecond(final int n) {
return upstream -> periodicallyRequestingSubscriber(upstream, n);
}
private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) {
return new Subscriber<T>() {
@Override
public void onStart() {
request(0); // request 0 so that source stops emitting
Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items
}
@Override
public void onCompleted() {
upstream.onCompleted();
}
@Override
public void onError(final Throwable e) {
upstream.onError(e);
}
@Override
public void onNext(final T integer) {
upstream.onNext(integer);
}
};
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With