I need buffer functionality with output size limiting. Say I have an observable stream myInterval which I want to gate the output of using a notifier observable bufferBy, but when the notifier fires I want to limit the number of items emitted. buffer doesn't have an overload like this, but it illustrates what I'd like to achieve.
const maxBufferSize = 5;
const myInterval = interval(1000);
const bufferBy = fromEvent(document, 'click');
const bufferedInterval = myInterval.pipe(buffer(bufferBy, maxBufferSize));
// ex. output: [1,2,3] ... [4,5,6,7,8] ... [9,10]
Should be lossless. How to do this?
Seems like the current RxJS operators are not able to do this. But it is not difficult to write our own operator with the wanted behaviour.
function bufferWithSize(bufferBy, maxBufferSize) {
return (observable) =>
new Observable((subscriber) => {
let buffer = []
// whenever bufferBy emits, we also emit the whole buffer to the subscriber
const bufferBySubscription = bufferBy.subscribe(() => {
subscriber.next(buffer)
buffer = []
})
const subscription = observable.subscribe({
next(value) {
// when the source emits, we push the value into the buffer
buffer.push(value)
// if we reach the maxBufferSize, we emit the whole buffer
if (buffer.length === maxBufferSize) {
subscriber.next(buffer)
buffer = []
}
},
error(err) {
subscriber.error(err);
},
complete() {
// emit the rest of the buffer when the source completes
if (buffer.length > 0) subscriber.next(buffer)
subscriber.complete()
},
});
return () => {
// clean up subscriptions when bufferWithSize is unsubscribed
subscription.unsubscribe();
bufferBySubscription.unsubscribe()
};
});
}
Usage:
const maxBufferSize = 5;
const myInterval = interval(1000);
const bufferBy = interval(3000) // change this to 8000 to see the maxBufferSize in action
const bufferedInterval = myInterval.pipe(
bufferWithSize(bufferBy, maxBufferSize)
).subscribe(res => console.log(res));
Maybe just like this?
window operator
const maxBufferSize = 5;
const myInterval = interval(1000);
const bufferBy$ = fromEvent(document, 'click');
myInterval.pipe(
window(bufferBy$),
mergeMap(bufferCount(maxBufferSize)),
).subscribe(console.log);
https://stackblitz.com/edit/szp6ke
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With