Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS Buffer or Buffer Size Limit

I need buffer functionality with output size limiting. Say I have an observable stream myInterval which I want to gate the output of using a notifier observable bufferBy, but when the notifier fires I want to limit the number of items emitted. buffer doesn't have an overload like this, but it illustrates what I'd like to achieve.

const maxBufferSize = 5;
const myInterval = interval(1000);
const bufferBy = fromEvent(document, 'click');

const bufferedInterval = myInterval.pipe(buffer(bufferBy, maxBufferSize));

// ex. output: [1,2,3] ... [4,5,6,7,8] ... [9,10]

Should be lossless. How to do this?

like image 296
ket Avatar asked Dec 01 '25 11:12

ket


2 Answers

Seems like the current RxJS operators are not able to do this. But it is not difficult to write our own operator with the wanted behaviour.

function bufferWithSize(bufferBy, maxBufferSize) {
  return (observable) =>
    new Observable((subscriber) => {
      let buffer = []

      // whenever bufferBy emits, we also emit the whole buffer to the subscriber
      const bufferBySubscription = bufferBy.subscribe(() => {
        subscriber.next(buffer)
        buffer = []
      })
      
      const subscription = observable.subscribe({
        next(value) {
          // when the source emits, we push the value into the buffer
          buffer.push(value)

          // if we reach the maxBufferSize, we emit the whole buffer
          if (buffer.length === maxBufferSize) {
            subscriber.next(buffer)
            buffer = []
          }
        },
        error(err) {
          subscriber.error(err);
        },
        complete() {
          // emit the rest of the buffer when the source completes
          if (buffer.length > 0) subscriber.next(buffer)
          subscriber.complete()
        },
      });
 
      return () => {
        // clean up subscriptions when bufferWithSize is unsubscribed
        subscription.unsubscribe();
        bufferBySubscription.unsubscribe()
      };
    });
}

Usage:

const maxBufferSize = 5;
const myInterval = interval(1000);
const bufferBy = interval(3000) // change this to 8000 to see the maxBufferSize in action

const bufferedInterval = myInterval.pipe(
  bufferWithSize(bufferBy, maxBufferSize)
).subscribe(res => console.log(res));
like image 97
Tobias S. Avatar answered Dec 03 '25 02:12

Tobias S.


Maybe just like this?

window operator

const maxBufferSize = 5;
const myInterval = interval(1000);
const bufferBy$ = fromEvent(document, 'click');

myInterval.pipe(
  window(bufferBy$),
  mergeMap(bufferCount(maxBufferSize)),
).subscribe(console.log);

https://stackblitz.com/edit/szp6ke

like image 44
Eddy Lin Avatar answered Dec 03 '25 03:12

Eddy Lin



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!