Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin Flow onBackpressureDrop RxJava2 analog

In RxJava 2 Flowable there are different backpressure strategies, among them the most interesting are:

  • LATEST
  • BUFFER
  • DROP

which are respected throughout whole Rx chain.

In Kotlin there is Flow, which declares that it has backpressure support out-of-the-box. I was able to make Flow to have BUFFER and LATEST strategies by using the following:

For BUFFER:

observeFlow()
    .buffer(10)
    .collect { ... }

With LATEST:

observeFlow()
    .conflate()
    .collect { ... }

Which is just shortcut on that same buffer operator.

But I was not able to find anything that can work same as DROP. In short DROP will drop any value which comes in the stream when previous value hasn't been processed yet. And with Flow I even not sure that it is possible at all.

Considering the case:

observeFlow()
    .backpressureDrop() // non-existent operator, just for illustrative purposes
    .map { ... }
    .flatMapMerge { ... }
    .collect { ... }

So backpressureDrop should respect any work which is done below in the stream, while that operator don't know anything about what is happening below (without explicit callback from the bottom - like "request" method in RxJava Subscriber). Therefore it seems not that possible. And that operator should not pass through any event before previous item was collected.

Is there any ready operator, which I miss, or is there a straightforward way to implement something like this with existing API?

like image 840
krossovochkin Avatar asked Jan 25 '20 15:01

krossovochkin


2 Answers

We can build this using a Flow backed by a Rendezvous Channel.

When capacity is 0 – it creates RendezvousChannel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.

A Rendezvous channel has no buffer. Therefore, consumers of this channel are required to be suspended and waiting for the next element in order for an element to be sent to this channel. We can exploit this quality to drop values that can't be accepted without the channel suspending using Channel.offer, which is a normal non-suspending function.

Channel.offer

Adds element into this queue if it is possible to do so immediately without violating capacity restrictions and returns true. Otherwise, it returns false immediately or throws exception if the channel isClosedForSend (see close for details).

Because channelFlow is buffered, we need to apply Flow<T>.buffer downstream to 0.

/**
 * Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow
 * are offered to the underlying [channelFlow]. If the consumer is not currently suspended and 
 * waiting for the next element, the element is dropped. 
 * 
 * @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element
 */
fun <T> Flow<T>.drop(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(capacity = 0)

Here's an example of how a slow consumer can use this to drop elements.

fun main() = runBlocking {
    flow {
        (0..100).forEach {
            emit(it)
            delay(100)
        }
    }.drop().collect {
        delay(1000)
        println(it)
    }
}

with the corresponding output:

0
11
21
31
41
51
61
71
81
91
like image 151
Kevin Cianfarini Avatar answered Sep 23 '22 21:09

Kevin Cianfarini


is there a straightforward way to implement something like this

Depends on your measure of straightforward. Here is how I would do it.

Backpressure translates to programmatical suspension and resumption in the coroutines world. For onBackpressureDrop, the downstream has to indicate it is ready for one item and suspend for it while the upstream should never wait for the downstream to be ready.

You have to consume the upstream in an unbounded manner and hand over items and terminal events to the downstream waiting for those signals.

package hu.akarnokd.kotlin.flow.impl

import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
 : AbstractFlow<T>() {
    @ExperimentalCoroutinesApi
    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        coroutineScope {
            val consumerReady = AtomicBoolean()
            val producerReady = Resumable()
            val value = AtomicReference<T>()
            val done = AtomicBoolean()
            val error = AtomicReference<Throwable>();

            launch {
                try {
                    source.collect {
                        if (consumerReady.get()) {
                            value.set(it);
                            consumerReady.set(false);
                            producerReady.resume();
                        }
                    }
                    done.set(true)
                } catch (ex: Throwable) {
                    error.set(ex)
                }
                producerReady.resume()
            }

            while (true) {
                consumerReady.set(true)
                producerReady.await()

                val d = done.get()
                val ex = error.get()
                val v = value.getAndSet(null)

                if (ex != null) {
                    throw ex;
                }
                if (d) {
                    break;
                }

                collector.emit(v)
            }
        }
    }
}

Note: Resumable implementation.

So let's walk through the implementation.

First, one needs 5 variables to hand over information between the collector of the upstream and the collector working for the downstream: - consumerReady indicates the downstream is ready for the next item, - producerReady indicates the producer has stored the next item (or terminal signal) and the downstream can resume - value the upstream item ready for consumption - done the upstream has ended - error the upstream has failed

Next, we have to launch the collector for the upstream because collect is suspending and would not let the downstream consumer loop run at all until completion. In this collector, we check if the downstream consumer is ready (via consumerReady) and if so, store the current item, clear the readiness flag and signal its availability via producerReady. Clearing the consumerReady will prevent subsequent upstream items to be stored until the downstream itself indicates a new readiness.

When the upstream ends or crashes, we set the done or error variables and indicate the producer has spoken.

After the launch { } part, the we'll now keep consuming the shared variables on behalf of the downstream collector.

The first thing in each round is to indicate we are ready for the next value, then wait for the producer side signal it has placed the next event in the shared variable(s).

Next, we collect values from these variables. We are eager the complete or throw an error, and only as a last resort re-emit the upstream item to the downstream collector.

like image 21
akarnokd Avatar answered Sep 22 '22 21:09

akarnokd