Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C++ Equivalent to Java's BlockingQueue

I'm in the process of porting some Java code over to C++, and one particular section makes use of a BlockingQueue to pass messages from many producers to a single consumer.

If you are not familiar with what a Java BlockingQueue is, it is just a queue that has a hard capacity, which exposes thread safe methods to put() and take() from the queue. put() blocks if the queue is full, and take() blocks if the queue is empty. Also, timeout-sensitive versions of these methods are supplied.

Timeouts are relevant to my use-case, so a recommendation that supplies those is ideal. If not, I can code up some myself.

I've googled around and quickly browsed the Boost libraries and I'm not finding anything like this. Maybe I'm blind here...but does anyone know of a good recommendation?

Thanks!

like image 875
Ben Avatar asked Oct 09 '12 17:10

Ben


People also ask

Is Java BlockingQueue thread-safe?

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.

Is BlockingQueue concurrent?

concurrent. BlockingQueue , represents a queue which is thread safe to put elements into, and take elements out of from. In other words, multiple threads can be inserting and taking elements concurrently from a Java BlockingQueue , without any concurrency issues arising.

What is BlockingQueue?

BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.


1 Answers

It isn't fixed size and it doesn't support timeouts but here is a simple implementation of a queue I had posted recently using C++ 2011 constructs:

#include <mutex> #include <condition_variable> #include <deque>  template <typename T> class queue { private:     std::mutex              d_mutex;     std::condition_variable d_condition;     std::deque<T>           d_queue; public:     void push(T const& value) {         {             std::unique_lock<std::mutex> lock(this->d_mutex);             d_queue.push_front(value);         }         this->d_condition.notify_one();     }     T pop() {         std::unique_lock<std::mutex> lock(this->d_mutex);         this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });         T rc(std::move(this->d_queue.back()));         this->d_queue.pop_back();         return rc;     } }; 

It should be trivial to extend and use a timed wait for popping. The main reason I haven't done it is that I'm not happy with the interface choices I have thought of so far.

like image 58
Dietmar Kühl Avatar answered Sep 21 '22 07:09

Dietmar Kühl