I have two threads. One is collecting data from a serial port and placing it into an array that is then added to a concurrent queue. The other thread, is taking these arrays and plotting the data in real time. There are about 150 packets of data of 50 bytes each per second. The problem I am having is that when this runs, it is consuming about 10% CPU. And this is on a very fast core I7 Haswell. If I add a Thread.Sleep(1) in the consuming thread then the CPU utilization drops to 1%. The problem is if I put this at Thread.Sleep(2) then the packets are not synchronized anymore. So this is not a solution, and if it is run on a slower computer it probably will not work.
The code is simple. Here is the producer thread:
static void FillQueue(byte[] buffer)
{
dataQueue.Enqueue(buffer);
}
and here is the consumer thread:
while (continuePolling)
{
Thread.Sleep(1); //if removed, there is 10% CPU utilization. If higher then packet synchronization is lost.
if (dataQueue.TryDequeue(out result))
{
ProcessPacket(result);
}
}
I find this hard to understand since the ProcessPacket method takes about 0.3 ms to execute and is called about 10 times a second.
I have now tried using a blocking collection instead. Here is the producer
BlockingCollection<byte[]> dataQueue = new BlockingCollection<byte[]>;
public static void addData(buffer)
{
dataQueue.add(buffer);
}
and here is the consumer
while (dataQueue.TryTake(out result)
{
ProcessPacket(result);
}
There is no difference at all! It uses 10% CPU and if I add a Thread.Sleep(1) is is fine, but a Thread.Sleep(2) and I loose packets I don't get it. There are 50 bytes in each packet. That's it. And they are consumed as fast as produced. Thanks
That's because you essentially have while (true) {}
loop. ProcessPacket
takes 0.3ms, and there are 10 packets per second, so 3ms of useful work per second. The rest 997ms your loop constantly checks if there is new item in queue, wasting your CPU resources.
Instead, use BlockingCollection
together with your queue, which has much better options for your task. It supports blocking dequeue (with timeouts and cancellation tokens if necessary), so not wastes CPU:
var dataQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());
// Add instead of Enqueue
dataQueue.Add("some item");
// Take instead of Dequeue, this will block until item is available in queue
var result = dataQueue.Take();
// blocks until item is available or timeout happens
if (dataQueue.TryTake(out result, TimeSpan.FromMilliseconds(100)))
It also supports "streaming" interface, so your code can be just:
foreach (var result in dataQueue.GetConsumingEnumerable()) {
ProcessPacket(result);
}
And instead of continuePolling
flag - call
dataQueue.CompleteAdding();
When no more items are expected (so, where you otherwise would set continuePolling
to false) - this will make GetConsumingEnumerable
to finish providing items and return.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With