In my app I have time consuming logic which can be started in many ways, let's say automatically or manually by user.
// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()
// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }
val startsDisposable = Observable
.merge(
autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
manualStarts
)
.subscribe(syncStarts) // merge emissions of both sources into one
val syncDisposable = syncStarts
.concatMap {
longOperation()
}
.subscribe(autoStarts) // end of long operation trigger start of auto timer
Start relays could produce many emissions. Let's say user clicks button for manual start and there is 5 secons left until auto start by timer. Both events will lead to longOperation()
to start if it was simple flatMap
.
I want only one thread to run longOperation()
inside, so if it's running now and not finished - ignore start emissions, anyway finish will lead to timer restart.
ConcatMap
helps me there at half - it adds longOperation()
to "queue" so they are processed one by one, but how could I write this to ignore any further starts until first one is completely finished?
You can use flatMap()
with an extra integer argument to limit parallelism.
syncStarts
.onBackpressureDrop() // 1
.flatMap(() -> longOperation(), 1) // 2
...
flatMap()
is busy.flatMap()
makes, essentially forcing operations to be sequential.The above does the functions that you want. However, you didn't specify what you wanted to happen once longOperation()
is running: did you want another operation to start immediately after? If so, you need to change the back pressure handling to queue up at most one emission.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With