Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Queue operator for RxJS

Tags:

rxjs

reactivex

Is there an operator in RxJS that would allow me to buffer items and let them out one by one whenever a signal observable fires? Sort of like bufferWhen, but instead of dumping the whole buffer on each signal it would dump a certain number per signal. It could even dump the number that gets emitted by the signal observable.

Input observable:  >--a--b--c--d--|
Signal observable: >------1---1-1-|
Count in buffer:   !--1--21-2-121-|
Output observable: >------a---b-c-|
like image 991
Arlen Beiler Avatar asked May 18 '17 00:05

Arlen Beiler


2 Answers

Yes, you can use zip to do what you want:

const input = Rx.Observable.from(["a", "b", "c", "d", "e"]);
const signal = new Rx.Subject();
const output = Rx.Observable.zip(input, signal, (i, s) => i);
output.subscribe(value => console.log(value));
signal.next(1);
signal.next(1);
signal.next(1);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

In fact, zip is used as an example in this GitHub issue that pertains to buffering.

If you want to use the signal's emitted value to determine how many buffered values are to be released, you could do something like this:

const input = Rx.Observable.from(["a", "b", "c", "d", "e"]);
const signal = new Rx.Subject();
const output = Rx.Observable.zip(
  input,
  signal.concatMap(count => Rx.Observable.range(0, count)),
  (i, s) => i
);
output.subscribe(value => console.log(value));
signal.next(1);
signal.next(2);
signal.next(1);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
like image 139
cartant Avatar answered Jan 04 '23 00:01

cartant


window can be used to separate the timeline. And takeLast is used to hold the output.

    let signal = Rx.Observable.interval(1000).take(4);

    let input = Rx.Observable.interval(300).take(10).share();

    let output = input
        .do(value => console.log(`input = ${value}`))
        .window(signal)
        .do(() => console.log(`*** signal : end OLD and start NEW subObservable`))
        .mergeMap(subObservable => {
            return subObservable.takeLast(100);
        })
        .share()

    output.subscribe(value => console.log(`    output = ${value}`));

    Rx.Observable.merge(input.mapTo(1), output.mapTo(-1))
        .scan((count, diff) => {
            return count + diff;
        }, 0)
        .subscribe(count => console.log(`            count = ${count}`));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Result:

22:28:37.971 *** signal : end OLD and start NEW subObservable  
22:28:38.289 input = 0  
22:28:38.292             count = 1  
22:28:38.575 input = 1  
22:28:38.576             count = 2  
22:28:38.914 input = 2  
22:28:38.915             count = 3  
            <signal received>
22:28:38.977     output = 0  
22:28:38.979             count = 2  
22:28:38.980     output = 1  
22:28:38.982             count = 1  
22:28:38.984     output = 2  
22:28:38.986             count = 0  
22:28:38.988 *** signal : end OLD and start NEW subObservable  
22:28:39.175 input = 3  
22:28:39.176             count = 1  
22:28:39.475 input = 4  
22:28:39.478             count = 2  
22:28:39.779 input = 5  
22:28:39.780             count = 3  
            <signal received>
22:28:39.984     output = 3  
22:28:39.985             count = 2  
22:28:39.986     output = 4  
22:28:39.988             count = 1  
22:28:39.989     output = 5  
22:28:39.990             count = 0  
22:28:39.992 *** signal : end OLD and start NEW subObservable  
22:28:40.075 input = 6  
22:28:40.077             count = 1  
22:28:40.377 input = 7  
22:28:40.378             count = 2  
22:28:40.678 input = 8  
22:28:40.680             count = 3  
22:28:40.987 input = 9  
22:28:40.990             count = 4  
            <input completed>
22:28:40.992     output = 6  
22:28:40.993             count = 3  
22:28:40.995     output = 7  
22:28:40.996             count = 2  
22:28:40.998     output = 8  
22:28:40.999             count = 1  
22:28:41.006     output = 9  
22:28:41.007             count = 0
like image 35
thatseeyou Avatar answered Jan 04 '23 01:01

thatseeyou