I have a PublishSubject that calls onNext()
on some UI event. The subscriber usually takes 2 seconds to complete its job. I need to ignore all calls to onNext()
except the last one while the subscriber is busy. I tried the following, however I'm unable to control the flow. The requests seem to get queued up and each and every request gets processed (and so back pressure isn't seemingly working). How can I make it ignore all requests but the last one? (I don't want to use debounce
as the code needs to react immediately and any reasonably small timeout won't work).
Moreover I realize using subscribeOn
with a subject has no effect, thus I'm using observeOn
to do async work in one of the operators. Is this the correct approach?
Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();
loadingQueue
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(AndroidSchedulers.mainThread())
.map(discarded -> {
// PRE-LOADING
Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
return discarded;
})
.observeOn(Schedulers.computation())
.map(b -> {
Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(b -> {
Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
});
loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....
The output I see is:
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main
Instead I expect the code to do the following (i.e. load once, and while it's loading, back-pressure to hold back on all requests and emit the last one, once the first observer has finished - so in total it should ideally only load twice at most):
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
You can't do this with observeOn
because it will buffer at least 1 element and thus always execute the "PRE-LOADING" stage if there is already one "LOADING" happening.
However you can do it with delay
as it doesn't manipulate request amounts on the chain and schedules each onNext individually on the scheduler without queueing it on its own:
public static void main(String[] args) throws Exception {
Subject<Boolean> loadingQueue =
PublishSubject.<Boolean>create().toSerialized();
loadingQueue
.toFlowable(BackpressureStrategy.LATEST)
.delay(0, TimeUnit.MILLISECONDS, Schedulers.single()) // <-------
.map(discarded -> {
// PRE-LOADING
System.out.println("PRE-LOADING: "
+ Thread.currentThread().getName());
return discarded;
})
.delay(0, TimeUnit.MILLISECONDS, Schedulers.computation()) // <-------
.map(b -> {
System.out.println("LOADING: "
+ Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.delay(0, TimeUnit.MILLISECONDS, Schedulers.single()) // <-------
.rebatchRequests(1) // <----------------------------------- one-by-one
.subscribe(b -> {
System.out.println("FINISHED: "
+ Thread.currentThread().getName() + "\n\n");
});
loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
Thread.sleep(10000);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With