Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS grouping emitted events nodejs

I'm am querying a database and retrieving the results as a row by row stream of events 'db_row_receieved'. I am trying to group these results by company Id, but I am getting no output on the subscription.

The db row format is shown below.

 // row 1
    {
        companyId: 50,
        value: 200
    }
    // row 2   
    {
        companyId: 50,
        value: 300
    }
    // row 3 
    {
        companyId: 51,
        value: 400
    }

Code:

var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved');
var grouped = source.groupBy((x) => { return x.companyId; });
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => {
                             return acc + v.value;
                          }, 0));

var subscription = selectMany.subscribe(function (obs) {
                        console.log("value: ", obs);
                   }

Expected output:

value: 500    // from the group with companyId 50
value: 400    // from the group with companyId 51

Actual output: subscription not outputting anything, but works when using Rx.Observable.fromArray(someArray)

Could anyone tell me where I have gone wrong please?

like image 203
user1145347 Avatar asked Feb 11 '26 12:02

user1145347


1 Answers

So the issue is that reduce will produce a single value only if the underlying stream completed. Since an event emitter is sort of infinite source it is always active.

Take a look at the snippet below - the first example completes, the other does not.

const data = [
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'A', v: 1},
  {k: 'A', v: 1},
];

Rx.Observable.from(data)
  .concatMap(d => Rx.Observable.of(d).delay(100))
  .groupBy(d => d.k)
  .mergeMap(group => group.reduce((acc, value) => {
    acc.sum += value.v;
    return acc;
  }, {key: group.key, sum: 0}))
  .do(d => console.log('RESULT', d.key, d.sum))
  .subscribe();
  
Rx.Observable.from(data)
  .concatMap(d => Rx.Observable.of(d).delay(100))
  .merge(Rx.Observable.never()) // MERGIN NEVER IN
  // .take(data.length) // UNCOMMENT TO MITIGATE NEVER
  .groupBy(d => d.k)
  .mergeMap(group => group.reduce((acc, value) => {
    acc.sum += value.v;
    return acc;
  }, {key: group.key, sum: 0}))
  .do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d))
  .subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>

I do not know your specific use case but 2 most common thing that comes to mind are:

  • use scan (probably with debounce),
  • use takeUntil if there is an event which indicates the end of underling stream.
like image 147
artur grzesiak Avatar answered Feb 14 '26 00:02

artur grzesiak



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!