Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent blocking queue in C++11

For message passing in between threads, I'm looking for a concurrent queue with following properties:

  • bounded size
  • pop method that blocks/waits until an element is available.
  • abort method to cancel the wait
  • Optional: priority

Multiple producers, one consumer.

The concurrent_bounded_queue of TBB would provide that, but I'm looking for alternatives to avoid the additional dependency of TBB.

The application uses C++11 and boost. I couldn't find anything suitable in boost. What are the options?

like image 516
jcm Avatar asked Jul 25 '13 09:07

jcm


People also ask

What is concurrent queue?

A concurrent queue is basically a queue which provides protection against multiple threads mutating its state and thus causing inconsistencies. A naive way to implement a concurrent queue may be to just slap locks in its enqueue and dequeue functions when they try to modify the head and tail.

Is Concurrentlinkedqueue thread safe?

An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time.

What is a BlockingQueue?

BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.

What is a lock free queue?

Lock-free queue is a queue applying to concurrency but without locking. When using lock-free queue, slow or stopped processes do not prevent other processes from accessing data in it. Lock-free queue has two main interfaces just like normal queue: Enqueue.


1 Answers

Naive implementation using Boost library(circular_buffer) and C++11 standard library.

#include <mutex>
#include <condition_variable>
#include <boost/circular_buffer.hpp>

struct operation_aborted {};

template <class T, std::size_t N>
class bound_queue {
public:
  typedef T value_type;
  bound_queue() : q_(N), aborted_(false) {}
  void push(value_type data)
  {
    std::unique_lock<std::mutex> lk(mtx_);
    cv_pop_.wait(lk, [=]{ return !q_.full() || aborted_; });
    if (aborted_) throw operation_aborted();
    q_.push_back(data);
    cv_push_.notify_one();
  }
  value_type pop()
  {
    std::unique_lock<std::mutex> lk(mtx_);
    cv_push_.wait(lk, [=]{ return !q_.empty() || aborted_; });
    if (aborted_) throw operation_aborted();
    value_type result = q_.front();
    q_.pop_front();
    cv_pop_.notify_one();
    return result;
  }
  void abort()
  {
    std::lock_guard<std::mutex> lk(mtx_);
    aborted_ = true;
    cv_pop_.notify_all();
    cv_push_.notify_all(); 
  }
private:
  boost::circular_buffer<value_type> q_;
  bool aborted_;
  std::mutex mtx_;
  std::condition_variable cv_push_;
  std::condition_variable cv_pop_;
};
like image 187
yohjp Avatar answered Oct 03 '22 01:10

yohjp