A project I'm working on uses multiple threads to do work on a collection of files. Each thread can add files to the list of files to be processed, so I put together (what I thought was) a thread-safe queue. Relevant portions follow:
// qMutex is a std::mutex intended to guard the queue // populatedNotifier is a std::condition_variable intended to // notify waiting threads of a new item in the queue void FileQueue::enqueue(std::string&& filename) { std::lock_guard<std::mutex> lock(qMutex); q.push(std::move(filename)); // Notify anyone waiting for additional files that more have arrived populatedNotifier.notify_one(); } std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout) { std::unique_lock<std::mutex> lock(qMutex); if (q.empty()) { if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) { std::string ret = q.front(); q.pop(); return ret; } else { return std::string(); } } else { std::string ret = q.front(); q.pop(); return ret; } }
However, I am occasionally segfaulting inside the if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }
block, and inspection in gdb indicates that the segfaults are occurring because the queue is empty. How is this possible? It was my understanding that wait_for
only returns cv_status::no_timeout
when it has been notified, and this should only happen after FileQueue::enqueue
has just pushed a new item to the queue.
The C++ thread safe queue allows to use the queue by multiple thread in multi-threaded code. The thread safe queue is not a built-in method or class in C++; it can be implemented with the help of built-in STL libraries.
Modern C++: Writing a thread-safe Queue The STL provides a high-performance queue class in std::queue<T> ; however, the API is not very intuitive, or easy, to use and it is not safe to access from multiple threads.
Here your shared data is the pointer to the queue.So , in general , any time you have operations on the queue you need to protect queue and prevent multiple threads reach your queue at the same time. One good way is to implement Condition Variables. Variables are switches to lock threads in crucial data.
The STL does not provide any guarantees for thread safety. This is especially the case when modifying the same container from multiple threads. The implementation of the STL that you're using may provide some level of thread safety, but you would need to look at the documentation for your implementation.
It is best to make the condition (monitored by your condition variable) the inverse condition of a while-loop: while(!some_condition)
. Inside this loop, you go to sleep if your condition fails, triggering the body of the loop.
This way, if your thread is awoken--possibly spuriously--your loop will still check the condition before proceeding. Think of the condition as the state of interest, and think of the condition variable as more of a signal from the system that this state might be ready. The loop will do the heavy lifting of actually confirming that it's true, and going to sleep if it's not.
I just wrote a template for an async queue, hope this helps. Here, q.empty()
is the inverse condition of what we want: for the queue to have something in it. So it serves as the check for the while loop.
#ifndef SAFE_QUEUE #define SAFE_QUEUE #include <queue> #include <mutex> #include <condition_variable> // A threadsafe-queue. template <class T> class SafeQueue { public: SafeQueue(void) : q() , m() , c() {} ~SafeQueue(void) {} // Add an element to the queue. void enqueue(T t) { std::lock_guard<std::mutex> lock(m); q.push(t); c.notify_one(); } // Get the "front"-element. // If the queue is empty, wait till a element is avaiable. T dequeue(void) { std::unique_lock<std::mutex> lock(m); while(q.empty()) { // release lock as long as the wait and reaquire it afterwards. c.wait(lock); } T val = q.front(); q.pop(); return val; } private: std::queue<T> q; mutable std::mutex m; std::condition_variable c; }; #endif
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With