Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava- How to backpressure a flatmap()

Perhaps I'm overlooking a simple combination of operators (or an inherent cancellation behavior of RxJava altogether). But suppose I have a hot observable selectedItem that flatmaps to an RxJava-JDBC query.

@Test
public void testFlatMapBackPressure() { 
    Database db = null; //assign db

    BehaviorSubject<Integer> selectedItem = BehaviorSubject.create();

    //can I backpressure the queries so only the latest one is running, and any previous is cancelled?
    Observable<List<Integer>> currentValues = selectedItem.flatMap(i ->  db.select("SELECT VALUE FROM MY_TABLE WHERE ID =?")
            .parameter(i)
            .getAs(Integer.class)
            .toList());
}

How can I backpressure the flatMap() operator so it will ALWAYS only execute the latest query (and cancel any previous). I kind of want a backpressured flatMap operator to do something like this, where the "X" indicates a cancellation of the previous query

enter image description here

Is there a way to accomplish this? Or can it be accomplished already and I just do not see it?

like image 539
tmn Avatar asked Sep 25 '15 15:09

tmn


1 Answers

Sounds like you need switchMap() instead of flatMap().

Returns a new Observable by applying a function that you supply to each item emitted by the source Observable that returns an Observable, and then emitting the items emitted by the most recently emitted of these Observables.

enter image description here

like image 137
akarnokd Avatar answered Oct 22 '22 11:10

akarnokd