Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Does a multiple producer single consumer lock-free queue exist for c++? [closed]

The more I read the more confused I become... I would have thought it trivial to find a formally correct MPSC queue implemented in C++.

Every time I find another stab at it, further research seems to suggest there are issues such as ABA or other subtle race conditions.

Many talk of the need for garbage collection. This is something I want to avoid.

Is there an accepted correct open-source implementation out there?

like image 361
Steve Lorimer Avatar asked Jan 18 '12 22:01

Steve Lorimer

1 Answers

You may want to check disruptor; it's available in C++ here: http://lmax-exchange.github.io/disruptor/

You can also find explanation how it works here on stackoverflow Basically it's circular buffer with no locking, optimized for passing FIFO messages between threads in a fixed-size slots.

Here are two implementations which I found useful: Lock-free Multi-producer Multi-consumer Queue on Ring Buffer @ NatSys Lab. Blog and
Yet another implementation of a lock-free circular array queue @ CodeProject

NOTE: the code below is incorrect, I leave it only as an example how tricky these things can be.

If you don't like the complexity of google version, here is something similar from me - it's much simpler, but I leave it as an exercise to the reader to make it work (it's part of larger project, not portable at the moment). The whole idea is to maintain cirtular buffer for data and a small set of counters to identify slots for writing/written and reading/read. Since each counter is in its own cache line, and (normally) each is only atomically updated once in the live of a message, they can all be read without any synchronisation. There is one potential contention point between writing threads in post_done, it's required for FIFO guarantee. Counters (head_, wrtn_, rdng_, tail_) were selected to ensure correctness and FIFO, so dropping FIFO would also require change of counters (and that might be difficult to do without sacrifying correctness). It is possible to slightly improve performance for scenarios with one consumer, but I would not bother - you would have to undo it if other use cases with multiple readers are found.

On my machine latency looks like following (percentile on left, mean within this percentile on right, unit is microsecond, measured by rdtsc):

    total=1000000 samples, avg=0.24us     50%=0.214us, avg=0.093us     90%=0.23us, avg=0.151us     99%=0.322us, avg=0.159us     99.9%=15.566us, avg=0.173us 

These results are for single polling consumer, i.e. worker thread calling wheel.read() in tight loop and checking if not empty (scroll to bottom for example). Waiting consumers (much lower CPU utilization) would wait on event (one of acquire... functions), this adds about 1-2us to average latency due to context switch.

Since there is verly little contention on read, consumers scale very well with number of worker threads, e.g. for 3 threads on my machine:

    total=1500000 samples, avg=0.07us     50%=0us, avg=0us     90%=0.155us, avg=0.016us     99%=0.361us, avg=0.038us     99.9%=8.723us, avg=0.044us 

Patches will be welcome :)

// Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)  #pragma once  #include <core/api.hxx> #include <core/wheel/exception.hxx>  #include <boost/noncopyable.hpp> #include <boost/type_traits.hpp> #include <boost/lexical_cast.hpp> #include <typeinfo>  namespace core { namespace wheel {   struct bad_size : core::exception   {     template<typename T> explicit bad_size(const T&, size_t m)       : core::exception(std::string("Slot capacity exceeded, sizeof(")                   + typeid(T).name()                   + ") = "                   + boost::lexical_cast<std::string>(sizeof(T))                   + ", capacity = "                   + boost::lexical_cast<std::string>(m)                   )     {}   };            // inspired by Disruptor   template <typename Header>   class wheel : boost::noncopyable   {     __declspec(align(64))     struct slot_detail     {       // slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel)       // slot read:  (memory barrier in wheel) > read_done > (memory barrier in wheel)        // done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate       template <bool Writing>       void done(wheel* w)       {         if (Writing)           w->post_done(sequence);         else           w->read_done();       }        // cache line for sequence number and header       long long sequence;       Header header;        // there is no such thing as data type with variable size, but we need it to avoid thrashing       // cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element.       // This is well into UB territory! Using template parameter for this is not good, since it       // results in this small implementation detail leaking to all possible user interfaces.       __declspec(align(8))       char data[8];     };      // use this as a storage space for slot_detail, to guarantee 64 byte alignment     _declspec(align(64))     struct slot_block { long long padding[8]; };    public:     // wrap slot data to outside world     template <bool Writable>     class slot     {       template<typename> friend class wheel;        slot& operator=(const slot&); // moveable but non-assignable        // may only be constructed by wheel       slot(slot_detail* impl, wheel<Header>* w, size_t c)         : slot_(impl) , wheel_(w) , capacity_(c)       {}      public:       slot(slot&& s)         : slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_)       {         s.slot_ = NULL;       }        ~slot()       {         if (slot_)         {           slot_->done<Writable>(wheel_);         }       }        // slot accessors - use Header to store information on what type is actually stored in data       bool empty() const          { return !slot_; }       long long sequence() const  { return slot_->sequence; }       Header& header()            { return slot_->header; }       char* data()                { return slot_->data; }        template <typename T> T& cast()       {         static_assert(boost::is_pod<T>::value, "Data type must be POD");         if (sizeof(T) > capacity_)           throw bad_size(T(), capacity_);         if (empty())           throw no_data();         return *((T*) data());       }      private:       slot_detail*    slot_;       wheel<Header>*  wheel_;       const size_t    capacity_;     };    private:     // dynamic size of slot, with extra capacity, expressed in 64 byte blocks     static size_t sizeof_slot(size_t s)     {       size_t m = sizeof(slot_detail);       // add capacity less 8 bytes already within sizeof(slot_detail)       m += max(8, s) - 8;       // round up to 64 bytes, i.e. alignment of slot_detail       size_t r = m & ~(unsigned int)63;       if (r < m)         r += 64;       r /= 64;       return r;     }      // calculate actual slot capacity back from number of 64 byte blocks     static size_t slot_capacity(size_t s)     {       return s*64 - sizeof(slot_detail) + 8;     }      // round up to power of 2     static size_t round_size(size_t s)     {       // enfore minimum size       if (s <= min_size)         return min_size;        // find rounded value       --s;       size_t r = 1;       while (s)       {         s >>= 1;         r <<= 1;       };       return r;     }      slot_detail& at(long long sequence)     {       // find index from sequence number and return slot at found index of the wheel       return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]);     }    public:     wheel(size_t capacity, size_t size)       : head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_()       , blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size))     {       static_assert(boost::is_pod<Header>::value, "Header type must be POD");       static_assert(sizeof(slot_block) == 64, "This was unexpected");        wheel_ = new slot_block[size_ * blocks_];       // all slots must be initialised to 0       memset(wheel_, 0, size_ * 64 * blocks_);       active_ = 1;     }      ~wheel()     {       stop();       delete[] wheel_;     }      // all accessors needed     size_t capacity() const { return capacity_; }   // capacity of a single slot     size_t size() const     { return size_; }       // number of slots available     size_t queue() const    { return (size_t)head_ - (size_t)tail_; }     bool active() const     { return active_ == 1; }      // enough to call it just once, to fine tune slot capacity     template <typename T>     void check() const     {       static_assert(boost::is_pod<T>::value, "Data type must be POD");       if (sizeof(T) > capacity_)         throw bad_size(T(), capacity_);     }      // stop the wheel - safe to execute many times     size_t stop()     {       InterlockedExchange(&active_, 0);       // must wait for current read to complete       while (rdng_ != tail_)         Sleep(10);        return size_t(head_ - tail_);     }      // return first available slot for write     slot<true> post()     {       if (!active_)         throw stopped();        // the only memory barrier on head seq. number we need, if not overflowing       long long h = InterlockedIncrement64(&head_);       while(h - (long long) size_ > tail_)       {         if (InterlockedDecrement64(&head_) == h - 1)           throw overflowing();          // protection against case of race condition when we are overflowing         // and two or more threads try to post and two or more messages are read,         // all at the same time. If this happens we must re-try, otherwise we         // could have skipped a sequence number - causing infinite wait in post_done         Sleep(0);         h = InterlockedIncrement64(&head_);       }        slot_detail& r = at(h);       r.sequence = h;        // wrap in writeable slot       return slot<true>(&r, this, capacity_);     }      // return first available slot for write, nothrow variant     slot<true> post(std::nothrow_t)     {       if (!active_)         return slot<true>(NULL, this, capacity_);        // the only memory barrier on head seq. number we need, if not overflowing       long long h = InterlockedIncrement64(&head_);       while(h - (long long) size_ > tail_)       {         if (InterlockedDecrement64(&head_) == h - 1)           return slot<true>(NULL, this, capacity_);          // must retry if race condition described above         Sleep(0);         h = InterlockedIncrement64(&head_);       }        slot_detail& r = at(h);       r.sequence = h;        // wrap in writeable slot       return slot<true>(&r, this, capacity_);     }      // read first available slot for read     slot<false> read()     {       slot_detail* r = NULL;       // compare rdng_ and wrtn_ early to avoid unnecessary memory barrier       if (active_ && rdng_ < wrtn_)       {         // the only memory barrier on reading seq. number we need         const long long h = InterlockedIncrement64(&rdng_);         // check if this slot has been written, step back if not         if (h > wrtn_)           InterlockedDecrement64(&rdng_);         else           r = &at(h);       }        // wrap in readable slot       return slot<false>(r , this, capacity_);     }      // waiting for new post, to be used by non-polling clients     void acquire()     {       event_.acquire();     }      bool try_acquire()     {       return event_.try_acquire();     }      bool try_acquire(unsigned long timeout)     {       return event_.try_acquire(timeout);     }      void release()     {}    private:     void post_done(long long sequence)     {       const long long t = sequence - 1;        // the only memory barrier on written seq. number we need       while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t)         Sleep(0);        // this is outside of critical path for polling clients       event_.set();     }      void read_done()     {       // the only memory barrier on tail seq. number we need       InterlockedIncrement64(&tail_);     }      // each in its own cache line     // head_ - wrtn_ = no. of messages being written at this moment     // rdng_ - tail_ = no. of messages being read at the moment     // head_ - tail_ = no. of messages to read (including those being written and read)     // wrtn_ - rdng_ = no. of messages to read (excluding those being written or read)     __declspec(align(64)) volatile long long head_; // currently writing or written     __declspec(align(64)) volatile long long wrtn_; // written     __declspec(align(64)) volatile long long rdng_; // currently reading or read     __declspec(align(64)) volatile long long tail_; // read     __declspec(align(64)) volatile long active_;    // flag switched to 0 when stopped      __declspec(align(64))     api::event event_;          // set when new message is posted     const size_t blocks_;       // number of 64-byte blocks in a single slot_detail     const size_t capacity_;     // capacity of data() section per single slot. Initialisation depends on blocks_     const size_t size_;         // number of slots available, always power of 2     slot_block* wheel_;   }; }} 

Here is what polling consumer worker thread may look like:

  while (wheel.active())   {     core::wheel::wheel<int>::slot<false> slot = wheel.read();     if (!slot.empty())     {       Data& d = slot.cast<Data>();       // do work     }     // uncomment below for waiting consumer, saving CPU cycles     // else     //   wheel.try_acquire(10);   } 

Edited added consumer example

like image 75
bronekk Avatar answered Sep 20 '22 10:09
