Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lock-Free Multiple Producer/Consumer Queue in C++11

I'm trying to implement a lock free multiple producer, multiple consumer queue in C++11. I'm doing this as a learning exercise, so I'm well aware that I could just use an existing open source implementation, but I'd really like to find out why my code doesn't work. The data is stored in a ringbuffer, apparently it is a "bounded MPMC queue".

I've modelled it pretty closely to what I've read of Disruptor. The thing I've noticed is that it works absolutely fine with a single consumer and single/multiple producers, it's just multiple consumers which seems to break it.

Here's the queue:

    template <typename T>
class Queue : public IQueue<T>
{
public:
    explicit Queue( int capacity );
    ~Queue();

    bool try_push( T value );
    bool try_pop( T& value );
private:
    typedef struct
    {
        bool readable;
        T value;
    } Item;

    std::atomic<int> m_head;
    std::atomic<int> m_tail;
    int m_capacity;
    Item* m_items;
};

template <typename T>
Queue<T>::Queue( int capacity ) :
m_head( 0 ),
m_tail( 0 ),
m_capacity(capacity),
m_items( new Item[capacity] )
{
    for( int i = 0; i < capacity; ++i )
    {
        m_items[i].readable = false;
    }
}

template <typename T>
Queue<T>::~Queue()
{
    delete[] m_items;
}

template <typename T>
bool Queue<T>::try_push( T value )
{
    while( true )
    {
        // See that there's room
        int tail = m_tail.load(std::memory_order_acquire);
        int new_tail = ( tail + 1 );
        int head = m_head.load(std::memory_order_acquire);

        if( ( new_tail - head ) >= m_capacity )
        {
            return false;
        }

        if( m_tail.compare_exchange_weak( tail, new_tail, std::memory_order_acq_rel ) )
        {
            // In try_pop, m_head is incremented before the reading of the value has completed,
            // so though we've acquired this slot, a consumer thread may be in the middle of reading
            tail %= m_capacity;

            std::atomic_thread_fence( std::memory_order_acquire );
            while( m_items[tail].readable )
            {
            }

            m_items[tail].value = value;
            std::atomic_thread_fence( std::memory_order_release );
            m_items[tail].readable = true;

            return true;
        }
    }
}

template <typename T>
bool Queue<T>::try_pop( T& value )
{
    while( true )
    {
        int head = m_head.load(std::memory_order_acquire);
        int tail = m_tail.load(std::memory_order_acquire);

        if( head == tail )
        {
            return false;
        }

        int new_head = ( head + 1 );

        if( m_head.compare_exchange_weak( head, new_head, std::memory_order_acq_rel ) )
        {
            head %= m_capacity;

            std::atomic_thread_fence( std::memory_order_acquire );
            while( !m_items[head].readable )
            {
            }

            value = m_items[head].value;
            std::atomic_thread_fence( std::memory_order_release );
            m_items[head].readable = false;

            return true;
        }
    }
}

And here's the test I'm using:

void Test( std::string name, Queue<int>& queue )
{
    const int NUM_PRODUCERS = 64;
    const int NUM_CONSUMERS = 2;
    const int NUM_ITERATIONS = 512;
    bool table[NUM_PRODUCERS*NUM_ITERATIONS];
    memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool));

    std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS);

    std::chrono::system_clock::time_point start, end;
    start = std::chrono::system_clock::now();

    std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS);
    std::atomic<int> push_count (0);

    for( int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id )
    {
        threads[thread_id] = std::thread([&queue,thread_id,&push_count]()
                                 {
                                     int base = thread_id * NUM_ITERATIONS;

                                     for( int i = 0; i < NUM_ITERATIONS; ++i )
                                     {
                                         while( !queue.try_push( base + i ) ){};
                                         push_count.fetch_add(1);
                                     }
                                 });
    }

    for( int thread_id = 0; thread_id < ( NUM_CONSUMERS ); ++thread_id )
    {
        threads[thread_id+NUM_PRODUCERS] = std::thread([&]()
                                         {
                                             int v;

                                             while( pop_count.load() > 0 )
                                             {
                                                 if( queue.try_pop( v ) )
                                                 {
                                                     if( table[v] )
                                                     {
                                                         std::cout << v << " already set" << std::endl;
                                                     }
                                                     table[v] = true;
                                                     pop_count.fetch_sub(1);
                                                 }
                                             }
                                         });

    }

    for( int i = 0; i < ( NUM_PRODUCERS + NUM_CONSUMERS ); ++i )
    {
        threads[i].join();
    }

    end = std::chrono::system_clock::now();
    std::chrono::duration<double> duration = end - start;

    std::cout << name << " " << duration.count() << std::endl;

    std::atomic_thread_fence( std::memory_order_acq_rel );

    bool result = true;
    for( int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i )
    {
        if( !table[i] )
        {
            std::cout << "failed at " << i << std::endl;
            result = false;
        }
    }
    std::cout << name << " " << ( result? "success" : "fail" ) << std::endl;
}

Any nudging in the right direction would be greatly appreciated. I'm pretty new to memory fences rather than just using a mutex for everything, so I'm probably just fundamentally misunderstanding something.

Cheers J

like image 755
Joe Avatar asked Sep 07 '14 11:09

Joe


2 Answers

I'd give a look to Moody Camel's implementation.

It is a fast general purpose lock-free queue for C++ entirely written in C++11. Documentation seems to be rather good along with a few performance tests.

Among all other interesting things (they're worth a read anyway), it's all contained in a single header, and available under the simplified BSD license. Just drop it in your project and enjoy!

like image 109
EnzoR Avatar answered Oct 07 '22 21:10

EnzoR


The simplest approach uses a circular buffer. That is it's like an array of 256 elements and you use uint8_t as index so it wraps around and starts at beginning when you overflow it.

The simplest primitive you can build upon is when you have single producer, single consumer thread.

The buffer has two heads:

  • Write head: It points the element which will be written next.
  • Read head: It points to the element which will be read next.

Operation of the producer:

  1. If write Head + 1 == read head, the buffer is full, return buffer full error.
  2. Write content to the element.
  3. Insert memory barrier to sync CPU cores.
  4. Move the write head forward.

At the buffer full case there is still 1 room left, but we reserve that, to distinguish from the buffer empty case.

Operation of the consumer:

  1. If read head == write head, the buffer is empty, return buffer empty error.
  2. Read content of the element.
  3. Insert memory barrier to sync CPU cores.
  4. Move the read head forward.

The producer owns the write head, the consumer owns the read head, there is no concurrency on those. Also the heads are updated when the operation is completed, this ensure the consumer leaves finished elements behind, and the consumes leaves behind fully consumed empty cells.

Create 2 of these pipes in both directions whenever you fork off a new thread and you can have bidirectional communication with your threads.

Given that we are talking about lock freeness it also means none of the threads are blocked, when there is nothing to do the threads are spinning empty, you may want to detect this and add some sleep when it happens.

like image 27
Calmarius Avatar answered Oct 07 '22 20:10

Calmarius