Let's consider for a moment the following code
Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged()
.subscribe(x => console.log(x))
We expect that 1
is logged just once. However what if we wanted to allow repetition of a value if its last emission was a configurable amount of time ago? I mean to get both events logged.
For example it would be cool to have something like the following
Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged(1000)
.subscribe(x => console.log(x))
In which distinctUntilChanged()
accepts some sort of timeout to allow repetition on the next element. However such a thing does not exist and I was wondering if anybody knows an elegant way to achieve this by using high level operators without messing with a filter that would require handling state
Unless I am misunderstanding I am pretty sure this could be accomplished in a relatively straight-forward manner with windowTime
:
Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250), // Ignored
Observable.of(1).delay(700), // Ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), //Ignored
Observable.of(2).delay(2300)
)
// Converts the stream into a stream of streams each 1000 milliseconds long
.windowTime(1000)
// Flatten each of the streams and emit only the latest (there should only be one active
// at a time anyway
// We apply the distinctUntilChanged to the windows before flattening
.switchMap(source => source.distinctUntilChanged())
.timeInterval()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('complete')
);
See the example here (borrowed @martin's example inputs)
This is an interesting use-case. I wonder whether there's an easier solution than mine (note that I'm using RxJS 5):
let timedDistinctUntil = Observable.defer(() => {
let innerObs = null;
let innerSubject = null;
let delaySub = null;
function tearDown() {
delaySub.unsubscribe();
innerSubject.complete();
}
return Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250), // ignored
Observable.of(1).delay(700), // ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), // ignored
Observable.of(2).delay(2300)
)
.do(undefined, undefined, () => tearDown())
.map(value => {
if (innerObs) {
innerSubject.next(value);
return null;
}
innerSubject = new BehaviorSubject(value);
delaySub = Observable.of(null).delay(1000).subscribe(() => {
innerObs = null;
});
innerObs = innerSubject.distinctUntilChanged();
return innerObs;
})
// filter out all skipped Observable emissions
.filter(observable => observable)
.switch();
});
timedDistinctUntil
.timestamp()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('complete')
);
See live demo: https://jsbin.com/sivuxo/5/edit?js,console
The entire logic is wrapped into Observable.defer()
static method because it requires some additional variables.
A couple points how this all works:
The merge()
is the source of items.
I use do()
to properly catch when the source completes so I can shutdown the internal timer and send proper complete notification.
The map()
operator is where the most interesting things happen. I reemit the value that it received and then return null
if there's already a valid Observable (it was created less then 1000ms ago = innerObs != null
). Then I eventually create a new Subject where I'm going to reemit all items and return this BehaviorSubject
chained with .distinctUntilChanged()
. At the end I schedule 1s delay to set innerObs = null
which means then when another value arrives it'll return a new Observable with new .distinctUntilChanged()
.
Then filter()
will let me ignore all null
values returned. This means it won't emit a new Observable more than once a second.
Now I need to work with so called Higher-order Observables (Observables emitting Observables. For this reason I use switch()
operator that always subscribes only to the newest Observable emitted by the source. In our case we emit Observables only max. once a second (thanks to the filter()
used above) and this inner itself Observable can emit as many values it wants and all of them are going to be passed through distinctUntilChanged()
so duplicates are ignored.
The output for this demo will look like the following output:
Timestamp { value: 1, timestamp: 1484670434528 }
Timestamp { value: 1, timestamp: 1484670436475 }
Timestamp { value: 2, timestamp: 1484670436577 }
complete
As you can see the value 1
is emitted twice with cca 2s delay. However value 2
passed without any problem after 100ms thanks to distinctUntilChanged()
.
I know this isn't simple but I hope it makes sense to you :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With