Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How would you limit the number of operations per second?

Tags:

c#

.net

How would you limit the number of operations per second?

Lets say we have to copy files from one location to another and we don't want more than 5 files to be processed per second.

Please see what I am doing

private static string currentStamp;
private static int processedInCurrentStamp = 0;

private static void Main(string[] args)
{
    currentStamp = DateTime.Now.ToString("{0:d/M/yyyy HH:mm:ss}");
    Run();
}

private static void Run()
{
    for (int i = 0; i < Int32.MaxValue; i++)
    {
        string s = DateTime.Now.ToString("{0:d/M/yyyy HH:mm:ss}");
        if (currentStamp.Equals(s))
        {
            if (processedInCurrentStamp < 5)
            {
                ProcessItem();
                processedInCurrentStamp++;
            }
        }
        else
        {
            Console.WriteLine("{0} ::: {1}", currentStamp, processedInCurrentStamp);
            currentStamp = s;
            processedInCurrentStamp = 0;
        }
    }
}

But I need a more elegant and bullet proof way.

like image 230
Sumee Avatar asked Nov 03 '10 07:11

Sumee


4 Answers

Get the starting time, and then in the loop calculate the maximum number of files that should be processed up to the current time, and sleep if you are ahead:

DateTime start = DateTime.UtcNow;

int i = 1;
while (i <= 100) {

  int limit = (int)((DateTime.UtcNow - start).TotalSeconds * 5.0);

  if (i <= limit) {

    Console.WriteLine(i);
    i++;

  } else {
    Thread.Sleep(100);
  }

}

This way the code will catch up if some operations take longer. If you only get three operations one second, it can do seven the next second.

Note that I am using UtcNow instead of Now to avoid the nasty jump in time that happens twice a year.

Edit:

Another alternative is to measure the time that an operation takes, and sleep the rest of it's time slot:

for (int i = 1; i <= 100; i++ ) {

  DateTime start = DateTime.UtcNow;

  Console.WriteLine(i);

  int left = (int)(start.AddSeconds(1.0 / 5.0) - DateTime.UtcNow).TotalMilliseconds;
  if (left > 0) {
    Thread.Sleep(left);
  }

}
like image 60
Guffa Avatar answered Sep 19 '22 01:09

Guffa


I would make a simple method that processes five files at a time and call that every second using a timer.

like image 45
Brian Rasmussen Avatar answered Sep 18 '22 01:09

Brian Rasmussen


Use a token bucket:

http://en.wikipedia.org/wiki/Token_bucket

like image 30
Martin Broadhurst Avatar answered Sep 20 '22 01:09

Martin Broadhurst


Sumee

You could use a throttled producer/ consumer queue. It would have a background thread which runs at an interval, processing your file. You could enqueue the file names as they arrive and the throttled dequeue action (Action) would dequeue. Here is an example of what I mean:

    public class ThrottledQueue<T> : IDisposable where T : class
{
    readonly object _locker = new object();
    readonly List<Thread> _workers;
    readonly Queue<T> _taskQueue = new Queue<T>();
    readonly Action<T> _dequeueAction;
    readonly TimeSpan _throttleTimespan;
    readonly Thread _workerThread;
    /// <summary>
    /// Initializes a new instance of the <see cref="SuperQueue{T}"/> class.
    /// </summary>
    /// <param name="millisecondInterval">interval between throttled thread invokation</param>
    /// <param name="dequeueAction">The dequeue action.</param>
    public ThrottledQueue(int millisecondInterval, Action<T> dequeueAction)
    {
        _dequeueAction = dequeueAction;

        // Create and start a separate thread for each worker
        _workerThread = new Thread(Consume) { IsBackground = true, Name = string.Format("ThrottledQueue worker") };
        _workerThread.Start();
        _throttleTimespan = new TimeSpan(0,0,0,0,millisecondInterval);

    }


    /// <summary>
    /// Enqueues the task.
    /// </summary>
    /// <param name="task">The task.</param>
    public void EnqueueTask(T task)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(task);
        }
    }

    /// <summary>
    /// Consumes this instance.
    /// </summary>
    void Consume()
    {
        while (true)
        {
            T item = default(T);
            lock (_locker)
            {
                Monitor.Wait(_locker, _throttleTimespan);
                if (_taskQueue.Count != 0) 
                    item = _taskQueue.Dequeue();
            }
            if (item == null) return;

            // run actual method
            _dequeueAction(item);
        }
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    public void Dispose()
    {
        // Enqueue one null task per worker to make each exit.
        EnqueueTask(null);

        _workerThread.Join();

    }
}
like image 39
dashton Avatar answered Sep 18 '22 01:09

dashton