Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use Observable.FromEventPattern to perform action after inactivity or count

I have an observable stream created from an event pattern like seen below.

var keyspaceStream = Observable.FromEventPattern<RedisSubscriptionReceivedEventArgs>(
            h => keyspaceMonitor.KeySpaceChanged += h,
            h => keyspaceMonitor.KeySpaceChanged -= h);

What I want to do is subscribe to the stream and execute a method when there has either been 10 seconds of inactivity (no events have occurred) or 100 events have fired without the method having been executed. This is to avoid scenarios when events are fired every 5 seconds and the onNext method is never called.

How can I accomplish this? I know how to do the first part (see below) but I can't figure out how to do the counting logic. Note that I already know how to subscribe to the stream.

var throttledStream = keyspaceStream.Throttle(TimeSpan.FromSeconds(10));

Any help would be much appreciated! Thank you.

like image 653
Stephen Avatar asked Sep 27 '22 20:09

Stephen


1 Answers

Use Buffer with a custom bufferClosingSelector. The idea here is that every buffer should be closed either after maxDuration or after maxCount items, whichever comes sooner. Each time a buffer closes, a new one is opened.

var maxDuration = TimeSpan.FromSeconds(10);
var maxCount = 100;
var throttledStream = keyspaceStream.Publish(o =>
{
    var reachedMaxDuration = o
        .Select(_ => Observable.Timer(maxDuration, scheduler))
        .Switch();
    return o.Buffer(() => o
        .TakeUntil(reachedMaxDuration)
        .Take(maxCount)
        .LastOrDefaultAsync());
});

I assume you provide the IScheduler scheduler. The type of throttledStream will be IObservable<IList<EventPattern<RedisSubscriptionReceivedEventArgs>>>.

like image 106
Timothy Shields Avatar answered Nov 15 '22 06:11

Timothy Shields