Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjs - buffer stream until function returns true

I have a stream of numbers which increase by a constant amount which I want to sub-sample. Given a constant sample interval, I want to buffer the stream until the difference between the first and last buffered value are greater or equal to interval. It would then emit this array, so similar to the buffer operator.

I've searched through the different rxjs operators but can't figure out how to make this work. A bufferUntil operator would be perfect but doesn't seem to exist. How can I implement this?

For example:

const interval = 15;
//example stream would be: 5, 10 , 15, 20, 25, 30..

Observable.pipe(
   bufferUntil(bufferedArray => {
       let last = bufferedArray.length - 1;
       return (bufferedArray[last] - bufferedArray[0] >= interval);
   })
).subscribe(x => console.log(x));

//With an expected output of [5, 10, 15, 20], [ 25, 30, 35, 40],..
like image 636
MJ_Wales Avatar asked Dec 11 '19 16:12

MJ_Wales


1 Answers

You can implement an operator that maintains a custom buffer. Push all incoming values into the buffer, emit the buffer when it meets a given condition and reset it. Use defer to supply every subscriber with its own buffer.

function bufferUntil<T>(emitWhen: (currentBuffer: T[]) => boolean): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => defer(() => {
    let buffer: T[] = []; // custom buffer
    return source.pipe(
      tap(v => buffer.push(v)), // add values to buffer
      switchMap(() => emitWhen(buffer) ? of(buffer) : EMPTY), // emit the buffer when the condition is met
      tap(() => buffer = []) // clear the buffer
    )
  });
}

https://stackblitz.com/edit/rxjs-7awqmv

like image 128
frido Avatar answered Oct 13 '22 11:10

frido