I need to cap the number of events n permitted during a time period deltaT. Any approach I can think of, space is O(m), where m is the maximum number of eventrequests sent per deltaT, or O(deltaT/r), where r is an acceptable resolution.
Edit: deltaT is a sliding time window relative to the timestamp.
For instance: Keep a circular buffer of event timestamps. On event crop all earlier timestamps than t-deltaT. Deny event if the number of timestamps exceeds n. Add timestamp to the buffer.
Or, init a circular bucket buffer of integers of size deltaT/r indexed by time relative to the current with resolution r. Maintain pointer i. On event, increment i by time since last event divided by r. Zero the buffer between the original i and the new one. Increment at i. Deny, if the sum of the bugger exceeds n.
What's a better way?
I just implemented my second suggestion above in c# with a fixed deltaT of 1 s and a fixed resolution of 10 ms.
public class EventCap
{
private const int RES = 10; //resolution in ms
private int _max;
private readonly int[] _tsBuffer;
private int p = 0;
private DateTime? _lastEventTime;
private int _length = 1000 / RES;
public EventCap(int max)
{
_max = max;
_tsBuffer = new int[_length];
}
public EventCap()
{
}
public bool Request(DateTime timeStamp)
{
if (_max <= 0)
return true;
if (!_lastEventTime.HasValue)
{
_lastEventTime = timeStamp;
_tsBuffer[0] = 1;
return true;
}
//A
//Mutually redundant with B
if (timeStamp - _lastEventTime >= TimeSpan.FromSeconds(1))
{
_lastEventTime = timeStamp;
Array.Clear(_tsBuffer, 0, _length);
_tsBuffer[0] = 1;
p = 0;
return true;
}
var newP = (timeStamp - _lastEventTime.Value).Milliseconds / RES + p;
if (newP < _length)
Array.Clear(_tsBuffer, p + 1, newP - p);
else if (newP > p + _length)
{
//B
//Mutually redundant with A
Array.Clear(_tsBuffer, 0, _length);
}
else
{
Array.Clear(_tsBuffer, p + 1, _length - p - 1);
Array.Clear(_tsBuffer, 0, newP % _length);
}
p = newP % _length;
_tsBuffer[p]++;
_lastEventTime = timeStamp;
var sum = _tsBuffer.Sum();
return sum <= 10;
}
}
What about having these variables: num_events_allowed, time_before, time_now, time_passed
At init time you will do: time_before = system.timer(), num_events_allowed = n
When an event is received you do the following:
time_now = system.timer()
time_passed = time_now - time_before
time_before = time_now
num_events_allowed += time_passed * (n / deltaT);
if num_events_allowed > n
num_events_allowed = n
if num_events_allowed >= 1
let event through, num_events_allowed -= 1
else
ignore event
Whats nice about this algorithm is the num_events_allowed is actually incremented by the time that has passed since the last event and the rate of which events can be received, that way you get an incrementation of the number of events you can send per that time_passed in order to stay in the limit of n. So if you get an event too soon, you will increment it by less than 1, if its after too much time you will increment it by more than one. Of course if the event goes through you decrement the allowance by 1 as you just got an event. If the allowance passes the max events which is n , you return it back to n as you cant allow more than n in any time phase. If the allowance is less than 1, you cant send a whole event, dont let it through!
This is the leaky bucket algorithm: https://en.wikipedia.org/wiki/Leaky_bucket
One way to keep the sliding window and still have it O(1) + very small O(n) for each incoming request is to make a suitable sized array of ints and keep it as a circular buffer and discretize incoming requests (the requests as integrated as with the sampled levels as in a A/D-converter, or as a histogram if you are a statistican) and keep track of the sum of the circular buffer, like this
assumptions:
"there can be no more than 1000 request per minute" and
"we discretize on every second"
int[] buffer = new zeroed int-array with 60 zeroes
int request-integrator = 0 (transactional)
int discretizer-integrator = 0 (transactional)
for each request:
check if request-integrator < 1000 then
// the following incs could be placed outside
// the if statement for saturation on to many
// requests (punishment)
request-integrator++ // important
discretizer-integrator++
proceed with request
once every second: // in a transactional memory transaction, for God's saké
buffer-index++
if (buffer-index = 60) then buffer-index=0 // for that circular buffer feeling!
request-integrator -= buffer[buffer-index] // clean for things happening one minute ago
buffer[buffer-index] = discretizer-integrator // save this times value
discretizer-integrator = 0 // resetting for next sampling period
Note that the increase of the request-integrator "could" be done just once every second, but that leaves a hole open for saturating it with 1000 requests or worse in one second about once every minute depending on punishment behaviour.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With