Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava Flowable.Interval backpressure when flatmap with single

I'm having a scenario where I need to periodically call an API to check for a result. I'm using Flowable.interval to create an interval function which calls the API.

However, I'm having trouble with backpressure. In my example below, a new single is created on each tick in the interval. The desired effect is to only call the API if a call is not already in progress

Flowable.interval(1, 1, TimeUnit.SECONDS).flatMap {
        System.out.println("Delay $it")

        //simulates API call
        Single.just(1L).doAfterSuccess {
            System.out.println("NEW SINGLE!!!")
        }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
            System.out.println("SINGLE SUCCESS!!!")
        }.toFlowable()
    }.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()

I can solve this using a filter variable like so:

var filter = true

Flowable.interval(1, 1, TimeUnit.SECONDS).filter {
    filter
}.flatMap {

    System.out.println("Delay $it")

    Single.just(1L).doOnSubscribe {
        filter = true
    }.doAfterSuccess {
        System.out.println("NEW SINGLE!!!")
    }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
        System.out.println("SINGLE!!!")
        filter = true
    }.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()

But it seems like a hacky solution. I've tired applying onBackPressureDrop after the interval function, but it has no effect.

Any suggestions?

like image 615
Richard Avatar asked Apr 13 '18 06:04

Richard


People also ask

What is difference between flowable and Observable?

Observables are used when we have relatively few items over the time and there is no risk of overflooding consumers. If there is a possibility that the consumer can be overflooded, then we use Flowable. One example could be getting a huge amount of data from a sensor. They typically push out data at a high rate.

What is RxJava flowable?

RxJava is a Reactive Extensions Java implementation that allows us to write event-driven, and asynchronous applications.

What is single RxJava?

Single is an Observable that always emit only one value or throws an error. A typical use case of Single observable would be when we make a network call in Android and receive a response. Sample Implementation: The below code always emits a Single user object. We use a Single Observable and a Single Observer.


1 Answers

You have to constrain flatMap as well:

Flowable.interval(1, 1, TimeUnit.SECONDS)
.onBackpressureDrop()
.flatMapSingle({
    System.out.println("Delay $it")

    //simulates API call
    Single.just(1L).doAfterSuccess {
        System.out.println("NEW SINGLE!!!")
    }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
        System.out.println("SINGLE SUCCESS!!!")
    }
}, false, 1)  // <----------------------------------------------------------
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe()
like image 102
akarnokd Avatar answered Sep 30 '22 20:09

akarnokd