I'm trying to implement a lock free multiple producer, multiple consumer queue in C++11. I'm doing this as a learning exercise, so I'm well aware that I could just use an existing open source implementation, but I'd really like to find out why my code doesn't work. The data is stored in a ringbuffer, apparently it is a "bounded MPMC queue".
I've modelled it pretty closely to what I've read of Disruptor. The thing I've noticed is that it works absolutely fine with a single consumer and single/multiple producers, it's just multiple consumers which seems to break it.
Here's the queue:
template <typename T>
class Queue : public IQueue<T>
{
public:
explicit Queue( int capacity );
~Queue();
bool try_push( T value );
bool try_pop( T& value );
private:
typedef struct
{
bool readable;
T value;
} Item;
std::atomic<int> m_head;
std::atomic<int> m_tail;
int m_capacity;
Item* m_items;
};
template <typename T>
Queue<T>::Queue( int capacity ) :
m_head( 0 ),
m_tail( 0 ),
m_capacity(capacity),
m_items( new Item[capacity] )
{
for( int i = 0; i < capacity; ++i )
{
m_items[i].readable = false;
}
}
template <typename T>
Queue<T>::~Queue()
{
delete[] m_items;
}
template <typename T>
bool Queue<T>::try_push( T value )
{
while( true )
{
// See that there's room
int tail = m_tail.load(std::memory_order_acquire);
int new_tail = ( tail + 1 );
int head = m_head.load(std::memory_order_acquire);
if( ( new_tail - head ) >= m_capacity )
{
return false;
}
if( m_tail.compare_exchange_weak( tail, new_tail, std::memory_order_acq_rel ) )
{
// In try_pop, m_head is incremented before the reading of the value has completed,
// so though we've acquired this slot, a consumer thread may be in the middle of reading
tail %= m_capacity;
std::atomic_thread_fence( std::memory_order_acquire );
while( m_items[tail].readable )
{
}
m_items[tail].value = value;
std::atomic_thread_fence( std::memory_order_release );
m_items[tail].readable = true;
return true;
}
}
}
template <typename T>
bool Queue<T>::try_pop( T& value )
{
while( true )
{
int head = m_head.load(std::memory_order_acquire);
int tail = m_tail.load(std::memory_order_acquire);
if( head == tail )
{
return false;
}
int new_head = ( head + 1 );
if( m_head.compare_exchange_weak( head, new_head, std::memory_order_acq_rel ) )
{
head %= m_capacity;
std::atomic_thread_fence( std::memory_order_acquire );
while( !m_items[head].readable )
{
}
value = m_items[head].value;
std::atomic_thread_fence( std::memory_order_release );
m_items[head].readable = false;
return true;
}
}
}
And here's the test I'm using:
void Test( std::string name, Queue<int>& queue )
{
const int NUM_PRODUCERS = 64;
const int NUM_CONSUMERS = 2;
const int NUM_ITERATIONS = 512;
bool table[NUM_PRODUCERS*NUM_ITERATIONS];
memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool));
std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS);
std::chrono::system_clock::time_point start, end;
start = std::chrono::system_clock::now();
std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS);
std::atomic<int> push_count (0);
for( int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id )
{
threads[thread_id] = std::thread([&queue,thread_id,&push_count]()
{
int base = thread_id * NUM_ITERATIONS;
for( int i = 0; i < NUM_ITERATIONS; ++i )
{
while( !queue.try_push( base + i ) ){};
push_count.fetch_add(1);
}
});
}
for( int thread_id = 0; thread_id < ( NUM_CONSUMERS ); ++thread_id )
{
threads[thread_id+NUM_PRODUCERS] = std::thread([&]()
{
int v;
while( pop_count.load() > 0 )
{
if( queue.try_pop( v ) )
{
if( table[v] )
{
std::cout << v << " already set" << std::endl;
}
table[v] = true;
pop_count.fetch_sub(1);
}
}
});
}
for( int i = 0; i < ( NUM_PRODUCERS + NUM_CONSUMERS ); ++i )
{
threads[i].join();
}
end = std::chrono::system_clock::now();
std::chrono::duration<double> duration = end - start;
std::cout << name << " " << duration.count() << std::endl;
std::atomic_thread_fence( std::memory_order_acq_rel );
bool result = true;
for( int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i )
{
if( !table[i] )
{
std::cout << "failed at " << i << std::endl;
result = false;
}
}
std::cout << name << " " << ( result? "success" : "fail" ) << std::endl;
}
Any nudging in the right direction would be greatly appreciated. I'm pretty new to memory fences rather than just using a mutex for everything, so I'm probably just fundamentally misunderstanding something.
Cheers J
I'd give a look to Moody Camel's implementation.
It is a fast general purpose lock-free queue for C++ entirely written in C++11. Documentation seems to be rather good along with a few performance tests.
Among all other interesting things (they're worth a read anyway), it's all contained in a single header, and available under the simplified BSD license. Just drop it in your project and enjoy!
The simplest approach uses a circular buffer. That is it's like an array of 256 elements and you use uint8_t
as index so it wraps around and starts at beginning when you overflow it.
The simplest primitive you can build upon is when you have single producer, single consumer thread.
The buffer has two heads:
Operation of the producer:
At the buffer full case there is still 1 room left, but we reserve that, to distinguish from the buffer empty case.
Operation of the consumer:
The producer owns the write head, the consumer owns the read head, there is no concurrency on those. Also the heads are updated when the operation is completed, this ensure the consumer leaves finished elements behind, and the consumes leaves behind fully consumed empty cells.
Create 2 of these pipes in both directions whenever you fork off a new thread and you can have bidirectional communication with your threads.
Given that we are talking about lock freeness it also means none of the threads are blocked, when there is nothing to do the threads are spinning empty, you may want to detect this and add some sleep when it happens.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With