I am trying to make a producer/consumer thread situation more efficient by skipping expensive event operations if necessary with something like:
//cas(variable, compare, set) is atomic compare and swap
//queue is already lock free
running = false
// dd item to queue – producer thread(s)
if(cas(running, false, true))
{
// We effectively obtained a lock on signalling the event
add_to_queue()
signal_event()
}
else
{
// Most of the time if things are busy we should not be signalling the event
add_to_queue()
if(cas(running, false, true))
signal_event()
}
...
// Process queue, single consumer thread
reset_event()
while(1)
{
wait_for_auto_reset_event() // Preferably IOCP
for(int i = 0; i < SpinCount; ++i)
process_queue()
cas(running, true, false)
if(queue_not_empty())
if(cas(running, false, true))
signal_event()
}
Obviously trying to get these things correct is a little tricky(!) so is the above pseudo code correct? A solution that signals the event more than is exactly needed is ok but not one that does so for every item.
A Multi Producer Single Consumer Packet Buffer (MPSC_PBUF) is a circular buffer, whose contents are stored in first-in-first-out order. Variable size packets are stored in the buffer. Packet buffer works under assumption that there is a single context that consumes the data.
Solution. For solving this concurrency problem, the producer and the consumer will have to communicate with each other. If the buffer is full, the producer will go to sleep and will wait to be notified.
In computing, the producer-consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, which share a common, fixed-size buffer used as a queue.
You can use wait, notify, and notifyAll methods to communicate between threads in Java. For example, if you have two threads running in your programs like Producer and Consumer then the producer thread can communicate to the consumer that it can start consuming now because there are items to consume in the queue.
This falls into the sub-category of "stop messing about and go back to work" known as "premature optimisation". :-)
If the "expensive" event operations are taking up a significant portion of time, your design is wrong, and rather than use a producer/consumer you should use a critical section/mutex and just do the work from the calling thread.
I suggest you profile your application if you are really concerned.
Updated:
Correct answer:
Producer
ProducerAddToQueue(pQueue,pItem){
EnterCriticalSection(pQueue->pCritSec)
if(IsQueueEmpty(pQueue)){
SignalEvent(pQueue->hEvent)
}
AddToQueue(pQueue, pItem)
LeaveCriticalSection(pQueue->pCritSec)
}
Consumer
nCheckQuitInterval = 100; // Every 100 ms consumer checks if it should quit.
ConsumerRun(pQueue)
{
while(!ShouldQuit())
{
Item* pCurrentItem = NULL;
EnterCriticalSection(pQueue-pCritSec);
if(IsQueueEmpty(pQueue))
{
ResetEvent(pQueue->hEvent)
}
else
{
pCurrentItem = RemoveFromQueue(pQueue);
}
LeaveCriticalSection(pQueue->pCritSec);
if(pCurrentItem){
ProcessItem(pCurrentItem);
pCurrentItem = NULL;
}
else
{
// Wait for items to be added.
WaitForSingleObject(pQueue->hEvent, nCheckQuitInterval);
}
}
}
Notes:
Assumptions:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With