Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can you conditionally buffer with rxjs?

The basic problem I'm trying to solve is that I have a node server that will send log errors and send an error email to our team when an error is thrown. In the event that something goes incredibly wrong, I don't want to spam our email inbox with thousands of emails.

So, the first time a particular error is thrown, send the email. After that, if the same error is thrown within a 5 minute time period, I want to collect those errors and emit them in an array every 5 minutes. I want to use rxjs if possible.

I have set up a simple scenario that almost works. You can see with the groupBy how I can potentially aggregate certain emails by type. The bufferTime() allows it to collect the emails over a time frame that I specify.

The problem I'm running into is this... how can I initially send the first email through, the collect everything else over 5 minutes?

I've tried iif(), buffer(), and a few other things with no luck so far.

        let subject = new Subject<number>();

        subject
            .pipe(
                groupBy(number => number % 2 === 0),
                mergeMap(group => group.pipe(
                    bufferTime(5000)
                )),
                filter(values => values.length > 0)
            )
            .subscribe(value => {
                console.log(value);
            });

        let index = 0;

        let handler = setInterval(() => {
            subject.next(index);
            index++;

            if(index >= 100) {
                clearInterval(handler);
                subject.complete();
            }
        }, 100);

This will output...

[
   0,  2,  4,  6,  8, 10, 12, 14,
  16, 18, 20, 22, 24, 26, 28, 30,
  32, 34, 36, 38, 40, 42, 44
]
[
   1,  3,  5,  7,  9, 11, 13, 15,
  17, 19, 21, 23, 25, 27, 29, 31,
  33, 35, 37, 39, 41, 43, 45
]
[
  46, 48, 50, 52, 54, 56, 58,
  60, 62, 64, 66, 68, 70, 72,
  74, 76, 78, 80, 82, 84, 86,
  88
]
[
  47, 49, 51, 53, 55, 57, 59,
  61, 63, 65, 67, 69, 71, 73,
  75, 77, 79, 81, 83, 85, 87,
  89
]
[ 90, 92, 94, 96, 98 ]
[ 91, 93, 95, 97, 99 ]

What I WANT to output is something like this...

[ 0 ]
[ 1 ]
[
   2,  4,  6,  8, 10, 12, 14,
  16, 18, 20, 22, 24, 26, 28, 30,
  32, 34, 36, 38, 40, 42, 44
]
[
   3,  5,  7,  9, 11, 13, 15,
  17, 19, 21, 23, 25, 27, 29, 31,
  33, 35, 37, 39, 41, 43, 45
]
[
  46, 48, 50, 52, 54, 56, 58,
  60, 62, 64, 66, 68, 70, 72,
  74, 76, 78, 80, 82, 84, 86,
  88
]
[
  47, 49, 51, 53, 55, 57, 59,
  61, 63, 65, 67, 69, 71, 73,
  75, 77, 79, 81, 83, 85, 87,
  89
]
[ 90, 92, 94, 96, 98 ]
[ 91, 93, 95, 97, 99 ]
like image 205
snotmare Avatar asked Mar 23 '26 15:03

snotmare


1 Answers

Based on condition, that needs to print first member of every group without a delay:

this.subject
  .pipe(
    groupBy(number => number % 2 === 0),
      mergeMap(group => {
        return merge(              
          group.pipe(first()),                
          group.pipe(bufferTime(5000))
        )       
      })
  ).subscribe(console.log)
like image 112
Julius Dzidzevičius Avatar answered Mar 26 '26 04:03

Julius Dzidzevičius



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!