Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Count events over a period of time and yield the sum once every second in RxJS

Is it possible to count events over a period of time and yield the sum once every second in RxJS? I have a continuous never ending stream of events. Every 1 second I would like to get the total number of events over the last 5 minute window. The idea is to use this to populate a realtime graph.

I know how to do this the traditional way but would really like to understand how it's done with reactive programming.

like image 312
Toby Avatar asked Oct 26 '15 12:10

Toby


2 Answers

Here's how I'd tackle it.

Create one observable that just counts the number of events received and emits this as a running total (via scan). Create a second observable which is just the running total delayed by 5 minutes. Create a third observable which just subtracts the delayed observable from the first observable. This will yield the running total of events that are younger than 5 minutes. Create a final observable that samples this third observable once per second.

const totalLast5Minutes = eventSource.publish(events => {
    const runningTotal = events
        .scan((e, total) => total + 1, 0)
        .startWith(0);
    const totalDelayed5Minutes = runningTotal
        .delay(5000 * 60)
        .startWith(0);
    return Rx.Observable
        .combineLatest(total, totalDelayed5Minutes, (t, td) => t - td);
});

// only sample the value once per second
Rx.Observable
    .interval(1000)
    .withLatestFrom(totalLast5Minutes, (interval, total) => total)
    .subscribe(total => console.log(`total=${total}`));
like image 99
Brandon Avatar answered Sep 30 '22 11:09

Brandon


Here's another way of doing it that I think may be a little simpler. You can use windowWithTime - see ReactiveX and RxJS doc. You can create windows that overlap, so you can have a 5 minute window of events, followed by another 5 minute window of events that started one second later, and another one second after that, etc.

If you take the count of each of these windows, you'll get a series of counts of 5 minutes of events, one second apart. It would look something like this:

source.windowWithTime(5 * 60000, 1000)   // create 5 minute windows, one second apart
      .flatMap(window => window.count()) // take each window and get the count once it completes (after 5 minutes)
      .subscribe(count => console.log(count));

I've used this to do exactly what you say - get a rate of events over a window of time in a non-stop stream of events. By having the windows overlap, you're generating a moving average of the activity on the stream.

like image 26
Niall Connaughton Avatar answered Sep 30 '22 12:09

Niall Connaughton