I'm having trouble coming up with this stream.
What I'm looking for is something like debounceTime
but with priority.
So if I have events with the shape { type: 'a', priority: 2 }
. These events needs to be debounced by a few seconds but instead of the last event being emitted, the event with the highest priority is emitted.
input stream:
------(a|1)--(b|3)---(c|2)-----------------------(a|1)-----------------
output stream:
-----------------------------------(b|3)---------------------(a|1)-----
I've try looking at other operators like window
and filtering through the result for the last event but it's not ideal because window
work on a fixed cadence where I want the timer to start on the first event like debouncing does.
You have to store and update the item with the highest priority and map to this highest
value which you then pass to debounceTime
.
let highest = null;
source$.pipe(
map(v => highest = highest && highest.priority > v.priority ? highest : v),
debounceTime(2000),
tap(() => highest = null)
);
You can create your own operator that does this with the help of defer
. defer
makes sure that every subscriber gets its own highest
variable, as every subscriber will get its own new Observable created by calling the given factory function.
function debounceTimeHighest<T>(dueTime: number, getHighest: (curr: T, high: T) => T): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => defer(() => {
let highest: T = null;
return source.pipe(
map(item => highest = highest ? getHighest(item, highest) : item),
debounceTime(dueTime),
tap(() => highest = null)
);
});
}
// Usage
source$.pipe(
debounceTimeHighest(2000, (v1, v2) => v1.priority >= v2.priority ? v1 : v2)
)
The code above is Typescript. If you want plain Javascript just remove all the types.
https://stackblitz.com/edit/rxjs-hitqxk
I'll offer the following solution, based around using scan
to offer up the highest given priority emission so far for consideration by debounceTime()
. Note that scan needs to reconsider new data after every successful debounce, so I use the operator window()
to split up the emissions, starting a new observable window after every emission by debounceTime()
.
Here is the CodeSandbox
And here is some simplified code from the CodeSandbox showing the important bits:
const resetScan$ = new Subject();
source$.pipe(
window(resetScan$),
mergeMap(win$ => win$.pipe(
scan((acc, cur) => acc.priority >= cur.priority ? acc : cur )
)),
debounceTime(debounceDelay),
tap(() => resetScan$.next())
);
You can combine the debounceTime and buffer and filter operator to achieve what you need. I have developed this small example for it.
https://stackblitz.com/edit/typescript-lwzt4k
/*
Collect clicks that occur, after 250ms emit array of clicks
*/
clicks$.pipe(
buffer(clicks$.pipe(debounceTime(1000))),
// if array is greater than 1, double click occured
map((clickArray) => {
document.querySelector('#emittedObjects').innerHTML = (`<div>${JSON.stringify(clickArray)}</div>`);
const sortedArray = clickArray.sort((a, b) => {
return a.priority < b.priority ? 1 : -1;
});
const output = sortedArray.length > 0 ? sortedArray[0] : null;
document.querySelector('#mappedOutput').innerHTML = JSON.stringify(output);
return output;
})
)
.subscribe((obj) => {
const str = obj ? JSON.stringify(obj) : 'NULL';
document.querySelector('#throttledOutput').innerHTML = `<div>THROTTLED: ${str}</div>`;
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With