For message passing in between threads, I'm looking for a concurrent queue with following properties:
Multiple producers, one consumer.
The concurrent_bounded_queue
of TBB would provide that, but I'm looking for alternatives to avoid the additional dependency of TBB.
The application uses C++11 and boost. I couldn't find anything suitable in boost. What are the options?
A concurrent queue is basically a queue which provides protection against multiple threads mutating its state and thus causing inconsistencies. A naive way to implement a concurrent queue may be to just slap locks in its enqueue and dequeue functions when they try to modify the head and tail.
An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time.
BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.
Lock-free queue is a queue applying to concurrency but without locking. When using lock-free queue, slow or stopped processes do not prevent other processes from accessing data in it. Lock-free queue has two main interfaces just like normal queue: Enqueue.
Naive implementation using Boost library(circular_buffer) and C++11 standard library.
#include <mutex>
#include <condition_variable>
#include <boost/circular_buffer.hpp>
struct operation_aborted {};
template <class T, std::size_t N>
class bound_queue {
public:
typedef T value_type;
bound_queue() : q_(N), aborted_(false) {}
void push(value_type data)
{
std::unique_lock<std::mutex> lk(mtx_);
cv_pop_.wait(lk, [=]{ return !q_.full() || aborted_; });
if (aborted_) throw operation_aborted();
q_.push_back(data);
cv_push_.notify_one();
}
value_type pop()
{
std::unique_lock<std::mutex> lk(mtx_);
cv_push_.wait(lk, [=]{ return !q_.empty() || aborted_; });
if (aborted_) throw operation_aborted();
value_type result = q_.front();
q_.pop_front();
cv_pop_.notify_one();
return result;
}
void abort()
{
std::lock_guard<std::mutex> lk(mtx_);
aborted_ = true;
cv_pop_.notify_all();
cv_push_.notify_all();
}
private:
boost::circular_buffer<value_type> q_;
bool aborted_;
std::mutex mtx_;
std::condition_variable cv_push_;
std::condition_variable cv_pop_;
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With