Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx Operators. Ignore until next is emitted

In my app I have time consuming logic which can be started in many ways, let's say automatically or manually by user.

// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()

// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }

val startsDisposable = Observable
        .merge(
                autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
                manualStarts
        )
        .subscribe(syncStarts) // merge emissions of both sources into one

val syncDisposable = syncStarts
        .concatMap {
            longOperation()
        }
        .subscribe(autoStarts) // end of long operation trigger start of auto timer

Start relays could produce many emissions. Let's say user clicks button for manual start and there is 5 secons left until auto start by timer. Both events will lead to longOperation() to start if it was simple flatMap. I want only one thread to run longOperation() inside, so if it's running now and not finished - ignore start emissions, anyway finish will lead to timer restart.

ConcatMap helps me there at half - it adds longOperation() to "queue" so they are processed one by one, but how could I write this to ignore any further starts until first one is completely finished?

like image 401
pavelkorolevxyz Avatar asked Oct 16 '22 09:10

pavelkorolevxyz


1 Answers

You can use flatMap() with an extra integer argument to limit parallelism.

syncStarts
  .onBackpressureDrop()               // 1
  .flatMap(() -> longOperation(), 1)  // 2
  ...
  1. Drop any emissions that occur while flatMap() is busy.
  2. The number 1 is the number of subscriptions that flatMap() makes, essentially forcing operations to be sequential.

The above does the functions that you want. However, you didn't specify what you wanted to happen once longOperation() is running: did you want another operation to start immediately after? If so, you need to change the back pressure handling to queue up at most one emission.

like image 50
Bob Dalgleish Avatar answered Oct 21 '22 05:10

Bob Dalgleish