Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJs: buffer events when condition is true, pass events through when condition is false

I created the Observable constructor below that works as described. Does anyone know if there is a more concise way of achieving the same behaviour using the operators that come with RxJs? I was looking at bufferToggle which is close to the required behaviour, but I need the emitted values to be passed through when the buffer is closed.

Function Description: Buffers the emitted source values if the condition emits true, and passes through the emitted source values if the condition emits false. If the condition emits false after being true, the buffer releases each value in the order that they were received. The buffer is initialised to pass through the emitted source values until the condition emits true.

function bufferIf<T>(condition: Observable<boolean>, source: Observable<T>): Observable<T> {
  return new Observable<T>(subscriber => {
    const subscriptions: Subscription[] = [];
    const buffer = [];
    let isBufferOpen = false;

    subscriptions.push(
      // handle source events
      source.subscribe(value => {
        // if buffer is open, or closed but buffer is still being 
        // emptied from previously being closed.
        if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
          buffer.push(value);

        } else {
          subscriber.next(value);
        }
      }),

      // handle condition events
      condition.do(value => isBufferOpen = value)
        .filter(value => !value)
        .subscribe(value => {
          while (buffer.length > 0 && !isBufferOpen) {
            subscriber.next(buffer.shift());
          }
        })
    );

    // on unsubscribe
    return () => {
      subscriptions.forEach(sub => sub.unsubscribe());
    };
  });
}

Edit

In response to comment, the following is the same function as the one above but in the form of an RxJs Operator and updated to use RxJx 6+ pipeabale Operators:

 function bufferIf<T>(condition: Observable<boolean>): MonoTypeOperatorFunction<T> {
   return (source: Observable<T>) => {
     return new Observable<T>(subscriber => {
       const subscriptions: Subscription[] = [];
       const buffer: T[] = [];
       let isBufferOpen = false;
   
       subscriptions.push(
         // handle source events
         source.subscribe(value => {
           // if buffer is open, or closed but buffer is still being 
           // emptied from previously being closed.
           if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
             buffer.push(value);
           } else {
             subscriber.next(value);
           }
         }),
   
         // handle condition events
         condition.pipe(
          tap(con => isBufferOpen = con),
          filter(() => !isBufferOpen)
         ).subscribe(() => {
          while (buffer.length > 0 && !isBufferOpen) {
            subscriber.next(buffer.shift());
          }
        })
       );
   
       // on unsubscribe
       return () => subscriptions.forEach(sub => sub.unsubscribe());
     });
   }
}
like image 572
Sam Herrmann Avatar asked Nov 11 '17 20:11

Sam Herrmann


1 Answers

Using a hot observable

Here's another way, slightly briefer (adding a new answer as the previous is rather busy)

// Source and buffering observables
const source$ = Rx.Observable.timer(0, 200).take(15)
const bufferIt$ = Rx.Observable.timer(0, 500).map(x => x % 2 !== 0).take(6)

const makeHot$ = (src) => {
  const hot$ = new Rx.Subject();
  src.subscribe(x => hot$.next(x));
  return hot$;
}

// Buffered output
const buffered$ = (source, bufferIt) => {
  const hot$ = makeHot$(source)
  const close = new Rx.Subject()
  return bufferIt
    .concat(Rx.Observable.of(false))       // ensure last buffer emits
    .do(x => {if(!x) close.next(true)} )   // close previous buffer
    .switchMap(x => x ? hot$.buffer(close) : hot$.map(x=>[x]))
    .mergeAll()
}

// Proof
const start = new Date()
const outputDisplay = buffered$(source$, bufferIt$).timestamp()
  .map(x => 'value: ' + x.value + ', elapsed: ' + (x.timestamp - start) )
const bufferDisplay = bufferIt$.timestamp()
  .map(x => (x.value ? 'buffer on' : 'buffer off') + ', elapsed: ' + (x.timestamp - start) )
bufferDisplay.merge(outputDisplay)
  .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
like image 127
Richard Matsen Avatar answered Nov 15 '22 03:11

Richard Matsen