I have an observable stream created from an event pattern like seen below.
var keyspaceStream = Observable.FromEventPattern<RedisSubscriptionReceivedEventArgs>(
h => keyspaceMonitor.KeySpaceChanged += h,
h => keyspaceMonitor.KeySpaceChanged -= h);
What I want to do is subscribe to the stream and execute a method when there has either been 10 seconds of inactivity (no events have occurred) or 100 events have fired without the method having been executed. This is to avoid scenarios when events are fired every 5 seconds and the onNext method is never called.
How can I accomplish this? I know how to do the first part (see below) but I can't figure out how to do the counting logic. Note that I already know how to subscribe to the stream.
var throttledStream = keyspaceStream.Throttle(TimeSpan.FromSeconds(10));
Any help would be much appreciated! Thank you.
Use Buffer
with a custom bufferClosingSelector
. The idea here is that every buffer should be closed either after maxDuration
or after maxCount
items, whichever comes sooner. Each time a buffer closes, a new one is opened.
var maxDuration = TimeSpan.FromSeconds(10);
var maxCount = 100;
var throttledStream = keyspaceStream.Publish(o =>
{
var reachedMaxDuration = o
.Select(_ => Observable.Timer(maxDuration, scheduler))
.Switch();
return o.Buffer(() => o
.TakeUntil(reachedMaxDuration)
.Take(maxCount)
.LastOrDefaultAsync());
});
I assume you provide the IScheduler scheduler
. The type of throttledStream
will be IObservable<IList<EventPattern<RedisSubscriptionReceivedEventArgs>>>
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With