Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C++11 lockless queue using std::atomic (multi writer, single consumer)

I've produced a simple implementation of the lockless (lockfree) queue using the new std::atomic in C++11. I can't see what I'm doing wrong here.

#include <atomic>

template<typename T>
class lockless_queue
{
public:
    template<typename DataType>
    struct node
    {
        node(const DataType& data)
          : data(data), next(nullptr) {}
        DataType data;
        node* next;
    };

    lockless_queue()
      : head_(nullptr) {}

    void produce(const T &data)
    {
        node<T>* new_node = new node<T>(data);
        // put the current value of head into new_node->next
        new_node->next = head_.load(std::memory_order_relaxed);
        // now make new_node the new head, but if the head
        // is no longer what's stored in new_node->next
        // (some other thread must have inserted a node just now)
        // then put that new head into new_node->next and try again
        while(!std::atomic_compare_exchange_weak_explicit(
            &head_,
            &new_node->next,
            new_node,
            std::memory_order_release,
            std::memory_order_relaxed)) {}
    }

    node<T>* consume_all()
    {
        // Reset queue and return head atomically
        return head_.exchange(nullptr, std::memory_order_consume);
    }
private:
    std::atomic<node<T>*> head_;
};

// main.cpp
#include <iostream>

int main()
{
    lockless_queue<int> s;
    s.produce(1);
    s.produce(2);
    s.produce(3);
    auto head = s.consume_all();
    while (head)
    {
        auto tmp = head->next;
        std::cout << tmp->data << std::endl;
        delete head;
        head = tmp;
    }
}

And my output:

2
1
Segmentation fault (core dumped)

Can I have some pointers where to look or an indication what I could be doing wrong?

Thanks!

like image 518
Amir Taaki Avatar asked Dec 20 '13 18:12

Amir Taaki


2 Answers

You are dereferencing tmp instead of head:

while (head)
    {
        auto tmp = head->next;
        std::cout << tmp->data << std::endl;
        delete head;
        head = tmp;
    }

should be:

while (head)
    {
        std::cout << head->data << std::endl;
        auto tmp = head->next;
        delete head;
        head = tmp;
    }

This is why 3 doesn't appear in your output and Segmentation fault does.

like image 131
Casey Avatar answered Oct 16 '22 17:10

Casey


You have another error in your code that won't show up until you start trying to perform concurrent enqueues. If your compare_exchange_weak_explicit fails, that implies that another thread managed to change the head pointer, and as such before you can try your CAS again, you need to re-load the new value of the head pointer into your new_node->next. The following will do the trick:

    while(!std::atomic_compare_exchange_weak_explicit(
        &head_,
        &new_node->next,
        new_node,
        std::memory_order_release,
        std::memory_order_relaxed)) {
        new_node->next = head_.load(std::memory_order_relaxed);
    }
like image 33
Ken P Avatar answered Oct 16 '22 18:10

Ken P