Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Deliver the first item immediately, 'debounce' following items

Consider the following use case:

  • need to deliver first item as soon as possible
  • need to debounce following events with 1 second timeout

I ended up implementing custom operator based on OperatorDebounceWithTime then using it like this

.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))

CustomOperatorDebounceWithTime delivers the first item immediately then uses OperatorDebounceWithTime operator's logic to debounce later items.

Is there an easier way to achieve described behavior? Let's skip the compose operator, it doesn't solve the problem. I'm looking for a way to achieve this without implementing custom operators.

like image 834
tomrozb Avatar asked May 09 '15 12:05

tomrozb


4 Answers

Update:
From @lopar's comments a better way would be:

Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))

Would something like this work:

String[] items = {"one", "two", "three", "four", "five", "six", "seven", "eight"};
Observable<String> myObservable = Observable.from(items);
Observable.concat(myObservable.first(), myObservable.skip(1).debounce(1, TimeUnit.SECONDS))
    .subscribe(s -> System.out.println(s));
like image 181
LordRaydenMK Avatar answered Dec 09 '22 08:12

LordRaydenMK


The answers by @LortRaydenMK and @lopar are best, but I wanted to suggest something else in case it happened to work better for you or for someone in a similar situation.

There's a variant of debounce() that takes a function that decides how long to debounce this particular item for. It specifies this by returning an observable that completes after some amount of time. Your function could return empty() for the first item and timer() for the rest. Something like (untested):

String[] items = {"one", "two", "three", "four", "five", "six"};
Observable.from(items)
    .debounce(item -> item.equals("one")
            ? Observable.empty()
            : Observable.timer(1, TimeUnit.SECONDS));

The trick is that this function would have to know which item is the first. Your sequence might know that. If it doesn't, you might have to zip() with range() or something. Better in that case to use the solution in the other answer.

like image 28
Lawrence Kesteloot Avatar answered Dec 09 '22 08:12

Lawrence Kesteloot


A simple solution using RxJava 2.0, translated from the answer for the same question for RxJS, which combines throttleFirst and debounce, then removes duplicates.

private <T> ObservableTransformer<T, T> debounceImmediate() {
    return observable  -> observable.publish(p -> 
        Observable.merge(p.throttleFirst(1, TimeUnit.SECONDS), 
            p.debounce(1, TimeUnit.SECONDS)).distinctUntilChanged());
} 

@Test
public void testDebounceImmediate() {
    Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
        .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
        .doOnNext(v -> System.out.println(LocalDateTime.now() + " T=" + v))
            .compose(debounceImmediate())
            .blockingSubscribe(v -> System.out.println(LocalDateTime.now() + " Debounced: " + v));
}

The approach of using limit() or take() doesn't seem to handle long lived data flows, where I might want to continually observe, but still act immediately for the first event seen for a time.

like image 30
Adrian Baker Avatar answered Dec 09 '22 07:12

Adrian Baker


The LordRaydenMK and lopar's answer has a problem: you always lose the second item. I supose that no one realeased this before because if you have a debounce you normally has a lot of events and the second is discarted with the debounce anyways. The correct way to never lose an event is:

observable
    .publish(published ->
        published
            .limit(1)
            .concatWith(published.debounce(1, TimeUnit.SECONDS)));

And don't worry, you are not going to get any duplicated event. If you aren't sure about it you can run this code and check it yourself:

Observable.just(1, 2, 3, 4)
    .publish(published ->
        published
            .limit(1)
            .concatWith(published))
    .subscribe(System.out::println);
like image 20
Brais Gabin Avatar answered Dec 09 '22 08:12

Brais Gabin