Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C++11 non-blocking producer/consumer

I have a C++11 application with a high-priority thread that's producing data, and a low-priority thread that's consuming it (in my case, writing it to disk). I'd like to make sure the high-priority producer thread is never blocked, i.e. it uses only lock-free algorithms.

With a lock-free queue, I can push data to the queue from the producer thread, and poll it from the consumer thread, thus meeting my goals above. I'd like to modify my program so that the consumer thread blocks when inactive instead of polling.

It seems like the C++11 condition variable might be useful to block the consumer thread. Can anyone show me an example of how to use it, while avoiding the possibility that the consumer sleeps with data still in the queue? More specifically, I want to make sure that the consumer is always woken up some finite time after the producer pushes the last item into the queue. It's also important that the producer remains non-blocking.

like image 672
nonagon Avatar asked Jan 28 '14 19:01

nonagon


1 Answers

It seems like the C++11 condition variable might be useful to block the consumer thread. Can anyone show me an example of how to use it, while avoiding the possibility that the consumer sleeps with data still in the queue?

To use a condition variable you need a mutex and a condition. In your case the condition will be "there is data available in the queue". Since the producer will be using lock-free updates to produce work, the consumer has to use the same form of synchronisation to consume the work, so the mutex will not actually be used for synchronisation and is only needed by the consumer thread because there's no other way to wait on a condition variable.

// these variables are members or otherwise shared between threads
std::mutex m_mutex;
std::condition_variable m_cv;
lockfree_queue m_data;

// ...

// in producer thread:
while (true)
{
  // add work to queue
  m_data.push(x);
  m_cv.notify_one();
}

// in consumer thread:
while (true)
{
  std::unique_lock<std::mutex> lock(m_mutex);
  m_cv.wait(lock, []{ return !m_data.empty(); });
  // remove data from queue and process it
  auto x = m_data.pop();
}

The condition variable will only block in the wait call if the queue is empty before the wait. The condition variable might wake up spuriously, or because it was notified by the producer, but in either case will only return from the wait call (rather than sleeping again) if the queue is non-empty. That's guaranteed by using the condition_variable::wait overload that takes a predicate, because the condition variable always re-checks the predicate for you.

Since the mutex is only used by the consumer thread it could in fact be local to that thread (as long as you only have one consumer, with more than one they all need to share the same mutex to wait on the same condvar).

like image 128
Jonathan Wakely Avatar answered Sep 20 '22 13:09

Jonathan Wakely