I'm attempting to write a multi-producer, multi-consumer queue. I'm using G++ 4.6 on Arch Linux, it breaks on G++ 4.7 also.
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <vector>
#include <iostream>
#include <string>
#include <sstream>
template <typename T> class concurrent_queue
{
public:
concurrent_queue(size_t n) : items(n), item_states(n), producer_index(0), consumer_index(0) {
}
virtual ~concurrent_queue () {
}
T * consume() {
auto index = consumer_index;
T * item = nullptr;
state_t state = USED;
// Extract item and block location.
while (!item_states[index].compare_exchange_strong(state, BLOCKED, std::memory_order_acquire, std::memory_order_acquire)) {
// Wait for an item to become available
std::unique_lock<std::mutex> lock(item_mutex);
has_items.wait(lock);
// Move to the newly available item (might already have been taken by another consumer).
index = consumer_index;
}
// Tell consumers to move to next location, wrap to beginning of circular buffer if necessary.
++consumer_index;
consumer_index %= items.size();
// Unblock index so that producers can write to it.
items[index] = nullptr;
return item;
}
void produce(T * value) {
items[producer_index] = value;
++producer_index;
producer_index %= items.size();
has_items.notify_one();
}
private:
typedef enum {
EMPTY,
USED,
BLOCKED
} state_t;
// Used as a buffer of items
std::vector<T* > items;
std::vector<std::atomic<state_t> > item_states;
size_t producer_index;
size_t consumer_index;
std::mutex item_mutex;
std::condition_variable has_items;
};
// Test code
using namespace std;
template <typename T> void pop_n_print(concurrent_queue<T> & queue) {
stringstream message;
message << "popped " << *queue.consume() << endl;
cout << message.str();
}
int main ()
{
concurrent_queue<int> ints(5);
ints.produce(new int(1));
ints.produce(new int(2));
pop_n_print(ints);
pop_n_print(ints);
return 0;
}
I compile this code with g++ --std=c++0x queue.cc -o test_queue
, but I get this error message:
/tmp/ccqCjADk.o: In function `concurrent_queue<int>::consume()':
queue.cc:(.text._ZN16concurrent_queueIiE7consumeEv[concurrent_queue<int>::consume()]+0x9f): undefined reference to `std::atomic<concurrent_queue<int>::state_t>::compare_exchange_strong(concurrent_queue<int>::state_t&, concurrent_queue<int>::state_t, std::memory_order, std::memory_order)'
collect2: ld returned 1 exit status
I'm not sure why this is occurring, but it seems to suggest that I need to link against something. Any ideas on where I'm going wrong? Any pointers are greatly appreciated.
Because std::atomic exist only for certain Ts, mainly numeric types.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With