Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx distinctUntilChanged allow repetition after configurable time between events

Let's consider for a moment the following code

Rx.Observable.merge(
  Rx.Observable.just(1),
  Rx.Observable.just(1).delay(1000)
).distinctUntilChanged()
  .subscribe(x => console.log(x))

We expect that 1 is logged just once. However what if we wanted to allow repetition of a value if its last emission was a configurable amount of time ago? I mean to get both events logged.

For example it would be cool to have something like the following

Rx.Observable.merge(
  Rx.Observable.just(1),
  Rx.Observable.just(1).delay(1000)
).distinctUntilChanged(1000)
  .subscribe(x => console.log(x))

In which distinctUntilChanged() accepts some sort of timeout to allow repetition on the next element. However such a thing does not exist and I was wondering if anybody knows an elegant way to achieve this by using high level operators without messing with a filter that would require handling state

like image 891
franDayz Avatar asked Jan 17 '17 13:01

franDayz


2 Answers

Unless I am misunderstanding I am pretty sure this could be accomplished in a relatively straight-forward manner with windowTime:

Observable
  .merge(
   Observable.of(1),
   Observable.of(1).delay(250), // Ignored
   Observable.of(1).delay(700), // Ignored
   Observable.of(1).delay(2000),
   Observable.of(1).delay(2200), //Ignored
   Observable.of(2).delay(2300)
  )
  // Converts the stream into a stream of streams each 1000 milliseconds long
  .windowTime(1000)
  // Flatten each of the streams and emit only the latest (there should only be one active 
  // at a time anyway
  // We apply the distinctUntilChanged to the windows before flattening
  .switchMap(source => source.distinctUntilChanged())  
  .timeInterval()
  .subscribe(
    value => console.log(value),
    error => console.log('error: ' + error),
    () => console.log('complete')
  );

See the example here (borrowed @martin's example inputs)

like image 118
paulpdaniels Avatar answered Oct 15 '22 00:10

paulpdaniels


This is an interesting use-case. I wonder whether there's an easier solution than mine (note that I'm using RxJS 5):

let timedDistinctUntil = Observable.defer(() => {
    let innerObs = null;
    let innerSubject = null;
    let delaySub = null;

    function tearDown() {
        delaySub.unsubscribe();
        innerSubject.complete();
    }

    return Observable
        .merge(
            Observable.of(1),
            Observable.of(1).delay(250),  // ignored
            Observable.of(1).delay(700),  // ignored
            Observable.of(1).delay(2000),
            Observable.of(1).delay(2200), // ignored
            Observable.of(2).delay(2300)
        )
        .do(undefined, undefined, () => tearDown())
        .map(value => {
            if (innerObs) {
                innerSubject.next(value);
                return null;
            }

            innerSubject = new BehaviorSubject(value);

            delaySub = Observable.of(null).delay(1000).subscribe(() => {
                innerObs = null;
            });

            innerObs = innerSubject.distinctUntilChanged();
            return innerObs;
        })
        // filter out all skipped Observable emissions
        .filter(observable => observable)
        .switch();
});

timedDistinctUntil
    .timestamp()
    .subscribe(
        value => console.log(value),
        error => console.log('error: ' + error),
        () => console.log('complete')
    );

See live demo: https://jsbin.com/sivuxo/5/edit?js,console

The entire logic is wrapped into Observable.defer() static method because it requires some additional variables.

A couple points how this all works:

  1. The merge() is the source of items.

  2. I use do() to properly catch when the source completes so I can shutdown the internal timer and send proper complete notification.

  3. The map() operator is where the most interesting things happen. I reemit the value that it received and then return null if there's already a valid Observable (it was created less then 1000ms ago = innerObs != null). Then I eventually create a new Subject where I'm going to reemit all items and return this BehaviorSubject chained with .distinctUntilChanged(). At the end I schedule 1s delay to set innerObs = null which means then when another value arrives it'll return a new Observable with new .distinctUntilChanged().

  4. Then filter() will let me ignore all null values returned. This means it won't emit a new Observable more than once a second.

  5. Now I need to work with so called Higher-order Observables (Observables emitting Observables. For this reason I use switch() operator that always subscribes only to the newest Observable emitted by the source. In our case we emit Observables only max. once a second (thanks to the filter() used above) and this inner itself Observable can emit as many values it wants and all of them are going to be passed through distinctUntilChanged() so duplicates are ignored.

The output for this demo will look like the following output:

Timestamp { value: 1, timestamp: 1484670434528 }
Timestamp { value: 1, timestamp: 1484670436475 }
Timestamp { value: 2, timestamp: 1484670436577 }
complete

As you can see the value 1 is emitted twice with cca 2s delay. However value 2 passed without any problem after 100ms thanks to distinctUntilChanged().

I know this isn't simple but I hope it makes sense to you :)

like image 40
martin Avatar answered Oct 14 '22 22:10

martin