I'm am querying a database and retrieving the results as a row by row stream of events 'db_row_receieved'. I am trying to group these results by company Id, but I am getting no output on the subscription.
The db row format is shown below.
// row 1
{
companyId: 50,
value: 200
}
// row 2
{
companyId: 50,
value: 300
}
// row 3
{
companyId: 51,
value: 400
}
Code:
var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved');
var grouped = source.groupBy((x) => { return x.companyId; });
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => {
return acc + v.value;
}, 0));
var subscription = selectMany.subscribe(function (obs) {
console.log("value: ", obs);
}
Expected output:
value: 500 // from the group with companyId 50
value: 400 // from the group with companyId 51
Actual output: subscription not outputting anything, but works when using Rx.Observable.fromArray(someArray)
Could anyone tell me where I have gone wrong please?
So the issue is that reduce will produce a single value only if the underlying stream completed. Since an event emitter is sort of infinite source it is always active.
Take a look at the snippet below - the first example completes, the other does not.
const data = [
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'A', v: 1},
{k: 'A', v: 1},
];
Rx.Observable.from(data)
.concatMap(d => Rx.Observable.of(d).delay(100))
.groupBy(d => d.k)
.mergeMap(group => group.reduce((acc, value) => {
acc.sum += value.v;
return acc;
}, {key: group.key, sum: 0}))
.do(d => console.log('RESULT', d.key, d.sum))
.subscribe();
Rx.Observable.from(data)
.concatMap(d => Rx.Observable.of(d).delay(100))
.merge(Rx.Observable.never()) // MERGIN NEVER IN
// .take(data.length) // UNCOMMENT TO MITIGATE NEVER
.groupBy(d => d.k)
.mergeMap(group => group.reduce((acc, value) => {
acc.sum += value.v;
return acc;
}, {key: group.key, sum: 0}))
.do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d))
.subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>
I do not know your specific use case but 2 most common thing that comes to mind are:
scan (probably with debounce),takeUntil if there is an event which indicates the end of underling stream.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With