Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS Debounce with priority

I'm having trouble coming up with this stream.

What I'm looking for is something like debounceTime but with priority.

So if I have events with the shape { type: 'a', priority: 2 }. These events needs to be debounced by a few seconds but instead of the last event being emitted, the event with the highest priority is emitted.

input stream:
------(a|1)--(b|3)---(c|2)-----------------------(a|1)-----------------


output stream:
-----------------------------------(b|3)---------------------(a|1)-----

I've try looking at other operators like window and filtering through the result for the last event but it's not ideal because window work on a fixed cadence where I want the timer to start on the first event like debouncing does.

like image 997
Rico Kahler Avatar asked Aug 23 '19 20:08

Rico Kahler


3 Answers

You have to store and update the item with the highest priority and map to this highest value which you then pass to debounceTime.

let highest = null;
source$.pipe(
  map(v => highest = highest && highest.priority > v.priority ? highest : v),
  debounceTime(2000),
  tap(() => highest = null)
);

You can create your own operator that does this with the help of defer. defer makes sure that every subscriber gets its own highest variable, as every subscriber will get its own new Observable created by calling the given factory function.

function debounceTimeHighest<T>(dueTime: number, getHighest: (curr: T, high: T) => T): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => defer(() => {
    let highest: T = null;
    return source.pipe(
      map(item => highest = highest ? getHighest(item, highest) : item),
      debounceTime(dueTime),
      tap(() => highest = null)
    );
  });
}

// Usage
source$.pipe(
  debounceTimeHighest(2000, (v1, v2) => v1.priority >= v2.priority ? v1 : v2)
)

The code above is Typescript. If you want plain Javascript just remove all the types.

https://stackblitz.com/edit/rxjs-hitqxk

like image 131
frido Avatar answered Nov 05 '22 14:11

frido


I'll offer the following solution, based around using scan to offer up the highest given priority emission so far for consideration by debounceTime(). Note that scan needs to reconsider new data after every successful debounce, so I use the operator window() to split up the emissions, starting a new observable window after every emission by debounceTime().

Here is the CodeSandbox

And here is some simplified code from the CodeSandbox showing the important bits:

const resetScan$ = new Subject();

source$.pipe(
  window(resetScan$),
  mergeMap(win$ => win$.pipe(
    scan((acc, cur) => acc.priority >= cur.priority ? acc : cur )
  )),
  debounceTime(debounceDelay),
  tap(() => resetScan$.next())
);
like image 44
dmcgrandle Avatar answered Nov 05 '22 15:11

dmcgrandle


You can combine the debounceTime and buffer and filter operator to achieve what you need. I have developed this small example for it.

https://stackblitz.com/edit/typescript-lwzt4k

/*
Collect clicks that occur, after 250ms emit array of clicks
*/
clicks$.pipe(
  buffer(clicks$.pipe(debounceTime(1000))),
  // if array is greater than 1, double click occured
  map((clickArray) => {
    document.querySelector('#emittedObjects').innerHTML = (`<div>${JSON.stringify(clickArray)}</div>`); 
    const sortedArray = clickArray.sort((a, b) => {
      return a.priority < b.priority ? 1 : -1;
    });

    const output = sortedArray.length > 0 ? sortedArray[0] : null;
    document.querySelector('#mappedOutput').innerHTML = JSON.stringify(output);
    return output;
  })
)
.subscribe((obj) => {
  const str = obj ? JSON.stringify(obj) : 'NULL';
  document.querySelector('#throttledOutput').innerHTML = `<div>THROTTLED: ${str}</div>`;
});
like image 34
Ashish Patel Avatar answered Nov 05 '22 15:11

Ashish Patel