Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava Subject with Backpressure - only let the last value emit once downstream has finished consuming

I have a PublishSubject that calls onNext() on some UI event. The subscriber usually takes 2 seconds to complete its job. I need to ignore all calls to onNext() except the last one while the subscriber is busy. I tried the following, however I'm unable to control the flow. The requests seem to get queued up and each and every request gets processed (and so back pressure isn't seemingly working). How can I make it ignore all requests but the last one? (I don't want to use debounce as the code needs to react immediately and any reasonably small timeout won't work).

Moreover I realize using subscribeOn with a subject has no effect, thus I'm using observeOn to do async work in one of the operators. Is this the correct approach?

Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();

loadingQueue
  .toFlowable(BackpressureStrategy.LATEST)
  .observeOn(AndroidSchedulers.mainThread())
  .map(discarded -> {
    // PRE-LOADING
    Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
    return discarded;
   })
   .observeOn(Schedulers.computation())
   .map(b -> {
     Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
     Thread.sleep(2000);
     return b;
   })
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(b -> {
      Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
   });


loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....

The output I see is:

PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main

Instead I expect the code to do the following (i.e. load once, and while it's loading, back-pressure to hold back on all requests and emit the last one, once the first observer has finished - so in total it should ideally only load twice at most):

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
like image 868
strangetimes Avatar asked Mar 11 '23 00:03

strangetimes


1 Answers

You can't do this with observeOn because it will buffer at least 1 element and thus always execute the "PRE-LOADING" stage if there is already one "LOADING" happening.

However you can do it with delay as it doesn't manipulate request amounts on the chain and schedules each onNext individually on the scheduler without queueing it on its own:

public static void main(String[] args) throws Exception {
    Subject<Boolean> loadingQueue = 
         PublishSubject.<Boolean>create().toSerialized();

    loadingQueue
      .toFlowable(BackpressureStrategy.LATEST)
      .delay(0, TimeUnit.MILLISECONDS, Schedulers.single())        // <-------
      .map(discarded -> {
        // PRE-LOADING
        System.out.println("PRE-LOADING: " 
             + Thread.currentThread().getName());
        return discarded;
       })
       .delay(0, TimeUnit.MILLISECONDS, Schedulers.computation())  // <-------
       .map(b -> {
           System.out.println("LOADING: " 
             + Thread.currentThread().getName());
         Thread.sleep(2000);
         return b;
       })
       .delay(0, TimeUnit.MILLISECONDS, Schedulers.single())       // <-------
       .rebatchRequests(1)             // <----------------------------------- one-by-one
       .subscribe(b -> {
           System.out.println("FINISHED: " 
               + Thread.currentThread().getName() + "\n\n");
       });


    loadingQueue.onNext(true);
    loadingQueue.onNext(true);
    loadingQueue.onNext(true);

    Thread.sleep(10000);
}
like image 156
akarnokd Avatar answered Apr 06 '23 02:04

akarnokd