Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Poor performance with Concurrent Queue

I have two threads. One is collecting data from a serial port and placing it into an array that is then added to a concurrent queue. The other thread, is taking these arrays and plotting the data in real time. There are about 150 packets of data of 50 bytes each per second. The problem I am having is that when this runs, it is consuming about 10% CPU. And this is on a very fast core I7 Haswell. If I add a Thread.Sleep(1) in the consuming thread then the CPU utilization drops to 1%. The problem is if I put this at Thread.Sleep(2) then the packets are not synchronized anymore. So this is not a solution, and if it is run on a slower computer it probably will not work.

The code is simple. Here is the producer thread:

    static void FillQueue(byte[] buffer)
    {
        dataQueue.Enqueue(buffer);
    }

and here is the consumer thread:

        while (continuePolling)
        {                
            Thread.Sleep(1); //if removed, there is 10% CPU utilization.  If higher then packet synchronization is lost.

            if (dataQueue.TryDequeue(out result))
            {
                ProcessPacket(result);
            }               
        }

I find this hard to understand since the ProcessPacket method takes about 0.3 ms to execute and is called about 10 times a second.

I have now tried using a blocking collection instead. Here is the producer

 BlockingCollection<byte[]> dataQueue = new BlockingCollection<byte[]>;

 public static void addData(buffer)
 {
      dataQueue.add(buffer);
 }

and here is the consumer

 while (dataQueue.TryTake(out result)
 {
     ProcessPacket(result);
 }

There is no difference at all! It uses 10% CPU and if I add a Thread.Sleep(1) is is fine, but a Thread.Sleep(2) and I loose packets I don't get it. There are 50 bytes in each packet. That's it. And they are consumed as fast as produced. Thanks

like image 257
Tom Avatar asked Feb 22 '18 03:02

Tom


1 Answers

That's because you essentially have while (true) {} loop. ProcessPacket takes 0.3ms, and there are 10 packets per second, so 3ms of useful work per second. The rest 997ms your loop constantly checks if there is new item in queue, wasting your CPU resources.

Instead, use BlockingCollection together with your queue, which has much better options for your task. It supports blocking dequeue (with timeouts and cancellation tokens if necessary), so not wastes CPU:

var dataQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());
// Add instead of Enqueue
dataQueue.Add("some item");
// Take instead of Dequeue, this will block until item is available in queue
var result = dataQueue.Take();
// blocks until item is available or timeout happens
if (dataQueue.TryTake(out result, TimeSpan.FromMilliseconds(100)))

It also supports "streaming" interface, so your code can be just:

foreach (var result in dataQueue.GetConsumingEnumerable()) {
    ProcessPacket(result);        
}

And instead of continuePolling flag - call

dataQueue.CompleteAdding();

When no more items are expected (so, where you otherwise would set continuePolling to false) - this will make GetConsumingEnumerable to finish providing items and return.

like image 104
Evk Avatar answered Sep 19 '22 15:09

Evk