Is it possible to count events over a period of time and yield the sum once every second in RxJS? I have a continuous never ending stream of events. Every 1 second I would like to get the total number of events over the last 5 minute window. The idea is to use this to populate a realtime graph.
I know how to do this the traditional way but would really like to understand how it's done with reactive programming.
Here's how I'd tackle it.
Create one observable that just counts the number of events received and emits this as a running total (via scan
).
Create a second observable which is just the running total delayed by 5 minutes.
Create a third observable which just subtracts the delayed observable from the first observable. This will yield the running total of events that are younger than 5 minutes.
Create a final observable that samples this third observable once per second.
const totalLast5Minutes = eventSource.publish(events => {
const runningTotal = events
.scan((e, total) => total + 1, 0)
.startWith(0);
const totalDelayed5Minutes = runningTotal
.delay(5000 * 60)
.startWith(0);
return Rx.Observable
.combineLatest(total, totalDelayed5Minutes, (t, td) => t - td);
});
// only sample the value once per second
Rx.Observable
.interval(1000)
.withLatestFrom(totalLast5Minutes, (interval, total) => total)
.subscribe(total => console.log(`total=${total}`));
Here's another way of doing it that I think may be a little simpler. You can use windowWithTime - see ReactiveX and RxJS doc. You can create windows that overlap, so you can have a 5 minute window of events, followed by another 5 minute window of events that started one second later, and another one second after that, etc.
If you take the count of each of these windows, you'll get a series of counts of 5 minutes of events, one second apart. It would look something like this:
source.windowWithTime(5 * 60000, 1000) // create 5 minute windows, one second apart
.flatMap(window => window.count()) // take each window and get the count once it completes (after 5 minutes)
.subscribe(count => console.log(count));
I've used this to do exactly what you say - get a rate of events over a window of time in a non-stop stream of events. By having the windows overlap, you're generating a moving average of the activity on the stream.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With