Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Most efficient way to process a queue with threads

I have a queue onto which pending fourier transform requests (comparatively time consuming operations) are placed - we could get thousands of transform requests per second in some cases, so its gotta be quick.

I'm upgrading the old code to use .net 4, as well as porting to TPL. I'm wondering what the most efficient (fastest throughput) way to handle this queue looks like. I'd like to use all cores available.

Currently I am experimenting with a BlockingCollection. I create a queue handler class that spawns 4 tasks, which block on the BlockingCollection and wait for incoming work. They then process that pending transform. Code:

public class IncomingPacketQueue : IDisposable
    {
        BlockingCollection<IncomingPacket> _packetQ = new BlockingCollection<IncomingPacket>();

        public IncomingPacketQueue(int workerCount)
        {
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }

        public void EnqueueSweep(IncomingPacket incoming)
        {
            _packetQ.Add(incoming);
        }

        private void Consume()
        {
            foreach (var sweep in _packetQ.GetConsumingEnumerable())
            {
                //do stuff
                var worker = new IfftWorker();
                Trace.WriteLine("  Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
                worker.DoIfft(sweep);                

            }
        }

        public int QueueCount
        {
            get
            {
                return _packetQ.Count;
            }
        }

    #region IDisposable Members

    public void Dispose()
    {
        _packetQ.CompleteAdding();
    }

    #endregion
    }

Does this look like a good solution? It seems to max out all cores - although I'm currently unsure how many workers I should spawn in my constructor.

like image 955
Matt Roberts Avatar asked Jun 01 '11 15:06

Matt Roberts


2 Answers

That looks reasonable. I've found BlockingCollection to be quite fast. I use it to process tens of thousands of requests per second.

If your application is processor bound, then you probably don't want to create more workers than you have cores. Certainly you don't want to create a lot more workers than cores. On a quad core machine, if you expect most of the time to be spent doing the FFTs, then four workers will eat all the CPU. More workers just means more that you have thread context switches to deal with. The TPL will typically balance that for you, but there's no reason to create, say, 100 workers when you can't handle more than a handful.

I would suggest that you run tests with 3, 4, 5, 6, 7, and 8 workers. See which one gives you the best throughput.

like image 188
Jim Mischel Avatar answered Oct 29 '22 14:10

Jim Mischel


I agree with Jim. Your approach looks really good. You are not going to get much better this. I am not an FFT expert, but I am assuming these operations are nearly 100% CPU bound. If that is indeed the case then a good first guess at the number of workers would be a direct 1-to-1 correlation with the number of cores in the machine. You can use Environment.ProcessorCount to get this value. You could experiment with a multiplier of say 2x or 4x, but again, if these operations are CPU bound then anything higher than 1x might just cause more overhead. Using Environment.ProcessorCount would make your code more portable.

Another suggestion...let the TPL know that these are dedicated threads. You can do this by specifying the LongRunning option.

public IncomingPacketQueue()
{
    for (int i = 0; i < Environment.ProcessorCount; i++)
    {
        Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning);
    }
}
like image 42
Brian Gideon Avatar answered Oct 29 '22 13:10

Brian Gideon