Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C++11 thread-safe queue

A project I'm working on uses multiple threads to do work on a collection of files. Each thread can add files to the list of files to be processed, so I put together (what I thought was) a thread-safe queue. Relevant portions follow:

// qMutex is a std::mutex intended to guard the queue // populatedNotifier is a std::condition_variable intended to //                   notify waiting threads of a new item in the queue  void FileQueue::enqueue(std::string&& filename) {     std::lock_guard<std::mutex> lock(qMutex);     q.push(std::move(filename));      // Notify anyone waiting for additional files that more have arrived     populatedNotifier.notify_one(); }  std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout) {     std::unique_lock<std::mutex> lock(qMutex);     if (q.empty()) {         if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {             std::string ret = q.front();             q.pop();             return ret;         }         else {             return std::string();         }     }     else {         std::string ret = q.front();         q.pop();         return ret;     } } 

However, I am occasionally segfaulting inside the if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { } block, and inspection in gdb indicates that the segfaults are occurring because the queue is empty. How is this possible? It was my understanding that wait_for only returns cv_status::no_timeout when it has been notified, and this should only happen after FileQueue::enqueue has just pushed a new item to the queue.

like image 814
Matt Kline Avatar asked Mar 07 '13 17:03

Matt Kline


People also ask

What is thread safe queue?

The C++ thread safe queue allows to use the queue by multiple thread in multi-threaded code. The thread safe queue is not a built-in method or class in C++; it can be implemented with the help of built-in STL libraries.

Is C++ std :: queue thread safe?

Modern C++: Writing a thread-safe Queue The STL provides a high-performance queue class in std::queue<T> ; however, the API is not very intuitive, or easy, to use and it is not safe to access from multiple threads.

How do I make a queue thread safe?

Here your shared data is the pointer to the queue.So , in general , any time you have operations on the queue you need to protect queue and prevent multiple threads reach your queue at the same time. One good way is to implement Condition Variables. Variables are switches to lock threads in crucial data.

Is C++ priority queue thread safe?

The STL does not provide any guarantees for thread safety. This is especially the case when modifying the same container from multiple threads. The implementation of the STL that you're using may provide some level of thread safety, but you would need to look at the documentation for your implementation.


1 Answers

It is best to make the condition (monitored by your condition variable) the inverse condition of a while-loop: while(!some_condition). Inside this loop, you go to sleep if your condition fails, triggering the body of the loop.

This way, if your thread is awoken--possibly spuriously--your loop will still check the condition before proceeding. Think of the condition as the state of interest, and think of the condition variable as more of a signal from the system that this state might be ready. The loop will do the heavy lifting of actually confirming that it's true, and going to sleep if it's not.

I just wrote a template for an async queue, hope this helps. Here, q.empty() is the inverse condition of what we want: for the queue to have something in it. So it serves as the check for the while loop.

#ifndef SAFE_QUEUE #define SAFE_QUEUE  #include <queue> #include <mutex> #include <condition_variable>  // A threadsafe-queue. template <class T> class SafeQueue { public:   SafeQueue(void)     : q()     , m()     , c()   {}    ~SafeQueue(void)   {}    // Add an element to the queue.   void enqueue(T t)   {     std::lock_guard<std::mutex> lock(m);     q.push(t);     c.notify_one();   }    // Get the "front"-element.   // If the queue is empty, wait till a element is avaiable.   T dequeue(void)   {     std::unique_lock<std::mutex> lock(m);     while(q.empty())     {       // release lock as long as the wait and reaquire it afterwards.       c.wait(lock);     }     T val = q.front();     q.pop();     return val;   }  private:   std::queue<T> q;   mutable std::mutex m;   std::condition_variable c; }; #endif 
like image 57
ChewOnThis_Trident Avatar answered Sep 22 '22 19:09

ChewOnThis_Trident