Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficient consumer thread with multiple producers

I am trying to make a producer/consumer thread situation more efficient by skipping expensive event operations if necessary with something like:

//cas(variable, compare, set) is atomic compare and swap
//queue is already lock free

running = false


// dd item to queue – producer thread(s)

if(cas(running, false, true))
{
  // We effectively obtained a lock on signalling the event
  add_to_queue()
  signal_event()
}
else
{
  // Most of the time if things are busy we should not be signalling the event
  add_to_queue()

  if(cas(running, false, true))
    signal_event()
}

...

// Process queue, single consumer thread

reset_event()

while(1)
{
  wait_for_auto_reset_event() // Preferably IOCP

  for(int i = 0; i &lt SpinCount; ++i)
    process_queue()

  cas(running, true, false)

  if(queue_not_empty())
    if(cas(running, false, true))
      signal_event()
}

Obviously trying to get these things correct is a little tricky(!) so is the above pseudo code correct? A solution that signals the event more than is exactly needed is ok but not one that does so for every item.

like image 863
iam Avatar asked Aug 26 '10 06:08

iam


People also ask

What is multiple producer single consumer?

A Multi Producer Single Consumer Packet Buffer (MPSC_PBUF) is a circular buffer, whose contents are stored in first-in-first-out order. Variable size packets are stored in the buffer. Packet buffer works under assumption that there is a single context that consumes the data.

How is the concurrency problem solved with Producer-Consumer problem?

Solution. For solving this concurrency problem, the producer and the consumer will have to communicate with each other. If the buffer is full, the producer will go to sleep and will wait to be notified.

What is Producer-Consumer problem in multithreading?

In computing, the producer-consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, which share a common, fixed-size buffer used as a queue.

How can we solve consumer problem by using wait () and notify () method?

You can use wait, notify, and notifyAll methods to communicate between threads in Java. For example, if you have two threads running in your programs like Producer and Consumer then the producer thread can communicate to the consumer that it can start consuming now because there are items to consume in the queue.


1 Answers

This falls into the sub-category of "stop messing about and go back to work" known as "premature optimisation". :-)

If the "expensive" event operations are taking up a significant portion of time, your design is wrong, and rather than use a producer/consumer you should use a critical section/mutex and just do the work from the calling thread.

I suggest you profile your application if you are really concerned.

Updated:

Correct answer:

Producer

ProducerAddToQueue(pQueue,pItem){

    EnterCriticalSection(pQueue->pCritSec)
        if(IsQueueEmpty(pQueue)){
            SignalEvent(pQueue->hEvent)
        }

        AddToQueue(pQueue, pItem)
    LeaveCriticalSection(pQueue->pCritSec)
}

Consumer

nCheckQuitInterval = 100; // Every 100 ms consumer checks if it should quit.

ConsumerRun(pQueue)
{
    while(!ShouldQuit())
    {
        Item* pCurrentItem = NULL;
        EnterCriticalSection(pQueue-pCritSec);
            if(IsQueueEmpty(pQueue))
            {
                ResetEvent(pQueue->hEvent)
            }
            else
            {
                pCurrentItem = RemoveFromQueue(pQueue);
            }
        LeaveCriticalSection(pQueue->pCritSec);

        if(pCurrentItem){
            ProcessItem(pCurrentItem);
            pCurrentItem = NULL;
        }
        else
        {
            // Wait for items to be added.
            WaitForSingleObject(pQueue->hEvent, nCheckQuitInterval);
        }

    }
}

Notes:

  • The event is a manual-reset event.
  • The operations protected by the critical section are quick. The event is only set or reset when the queue transitions to/from empty state. It has to be set/reset within the critical section to avoid a race condition.
  • This means the critical section is only held for a short time. so contention will be rare.
  • Critical sections don't block unless they are contended. So context switches will be rare.

Assumptions:

  • This is a real problem not homework.
  • Producers and consumers spend most of their time doing other stuff, i.e. getting the items ready for the queue, processing them after removing them from the queue.
  • If they are spending most of the time doing the actual queue operations, you shouldn't be using a queue. I hope that is obvious.
like image 144
Ben Avatar answered Sep 18 '22 07:09

Ben