Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava merge debounced and not debounced observables

I have two observables:

Observable O(open): file with some content opened in textview

Observable E(edit): file content edited in textview

I want to debounce E observable, and merge it with O observable.

obs = Observable.merge(E.debounce(2000, TimeUnit.MILLISECONDS) , O)
                .subscribe(content->System.out.println("new content: " + content))

The problem is that if E emits an event E1 and right after that O emits O1 event, we have output:

new content: O1
new content: E1 // this output is rebundant (cuz we already have newer content O1)

This is a diagram of what is going on:diagram

How to get rid of this excessive old event from debounced observable?

like image 592
wilddev Avatar asked Sep 21 '16 17:09

wilddev


2 Answers

You can try

Observable.merge(O, O.switchMap(o -> E.debounce()))
          .subscribe()

switchMap behaves much like flatMap, except that whenever a new item is emitted by the source Observable, it will unsubscribe to and stop mirroring the Observable that was generated from the previously-emitted item, and begin only mirroring the current one.

like image 92
Crocodilys Avatar answered Nov 20 '22 13:11

Crocodilys


I see two main options. One is to use timestamps which is easy enough but there are theoretical race conditions (but presumable very unlikely) and the other option is to use a unique identifier associated with each file opening and the events emitted from edits to the text of that opened file are accompanied with the identifier of the file opening.

Using timestamps:

obs = Observable.defer(() -> {
  AtomicBoolean first = new AtomicBoolean(true);
  e.timestamp()
   .debounce(2000, TimeUnit.MILLISECONDS))
   .mergeWith(o.timestamp())
   .buffer(2,1)
   .flatMap(list -> {
     Observable<Object> start;
     if (first.compareAndSet(true, false)) 
         start = Observable.just(list.get(0).getValue ());
     else 
         start = Observable.empty();
     if (list.size() == 1) 
         return start;
     else {
       Timestamped<Object> a = list.get(0);
       Timestamped<Object> b = list.get(1);
       if (a.getTimestampMillis() <= b.getTimestampMillis()) 
         return start.concatWith(Observable.just(b.getValue ()));
       else 
         return start; 
     }
   })
});

I suspect the timestamped version will be enough.

like image 38
Dave Moten Avatar answered Nov 20 '22 11:11

Dave Moten