I created the Observable constructor below that works as described. Does anyone know if there is a more concise way of achieving the same behaviour using the operators that come with RxJs? I was looking at bufferToggle which is close to the required behaviour, but I need the emitted values to be passed through when the buffer is closed.
Function Description: Buffers the emitted source
values if the condition
emits true
, and passes through the emitted source
values if the condition
emits false
. If the condition emits false
after being true
, the buffer releases each value in the order that they were received. The buffer is initialised to pass through the emitted source
values until the condition
emits true
.
function bufferIf<T>(condition: Observable<boolean>, source: Observable<T>): Observable<T> {
return new Observable<T>(subscriber => {
const subscriptions: Subscription[] = [];
const buffer = [];
let isBufferOpen = false;
subscriptions.push(
// handle source events
source.subscribe(value => {
// if buffer is open, or closed but buffer is still being
// emptied from previously being closed.
if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
buffer.push(value);
} else {
subscriber.next(value);
}
}),
// handle condition events
condition.do(value => isBufferOpen = value)
.filter(value => !value)
.subscribe(value => {
while (buffer.length > 0 && !isBufferOpen) {
subscriber.next(buffer.shift());
}
})
);
// on unsubscribe
return () => {
subscriptions.forEach(sub => sub.unsubscribe());
};
});
}
In response to comment, the following is the same function as the one above but in the form of an RxJs Operator and updated to use RxJx 6+ pipeabale Operators:
function bufferIf<T>(condition: Observable<boolean>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => {
return new Observable<T>(subscriber => {
const subscriptions: Subscription[] = [];
const buffer: T[] = [];
let isBufferOpen = false;
subscriptions.push(
// handle source events
source.subscribe(value => {
// if buffer is open, or closed but buffer is still being
// emptied from previously being closed.
if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
buffer.push(value);
} else {
subscriber.next(value);
}
}),
// handle condition events
condition.pipe(
tap(con => isBufferOpen = con),
filter(() => !isBufferOpen)
).subscribe(() => {
while (buffer.length > 0 && !isBufferOpen) {
subscriber.next(buffer.shift());
}
})
);
// on unsubscribe
return () => subscriptions.forEach(sub => sub.unsubscribe());
});
}
}
Here's another way, slightly briefer (adding a new answer as the previous is rather busy)
// Source and buffering observables
const source$ = Rx.Observable.timer(0, 200).take(15)
const bufferIt$ = Rx.Observable.timer(0, 500).map(x => x % 2 !== 0).take(6)
const makeHot$ = (src) => {
const hot$ = new Rx.Subject();
src.subscribe(x => hot$.next(x));
return hot$;
}
// Buffered output
const buffered$ = (source, bufferIt) => {
const hot$ = makeHot$(source)
const close = new Rx.Subject()
return bufferIt
.concat(Rx.Observable.of(false)) // ensure last buffer emits
.do(x => {if(!x) close.next(true)} ) // close previous buffer
.switchMap(x => x ? hot$.buffer(close) : hot$.map(x=>[x]))
.mergeAll()
}
// Proof
const start = new Date()
const outputDisplay = buffered$(source$, bufferIt$).timestamp()
.map(x => 'value: ' + x.value + ', elapsed: ' + (x.timestamp - start) )
const bufferDisplay = bufferIt$.timestamp()
.map(x => (x.value ? 'buffer on' : 'buffer off') + ', elapsed: ' + (x.timestamp - start) )
bufferDisplay.merge(outputDisplay)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With