Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Single producer, single consumer data structure with double buffer in C++

I have an application at $work where I have to move between two real-time threads that are scheduled at different frequencies. (The actual scheduling is beyond my control.) The application is hard real-time-ish (one of the threads has to drive a hardware interface), so the data transfer between the threads should be lock-free and wait-free to the extent possible.

It is important to note that only one block of data needs to be transferred: because the two threads run at different rates, there will be times when two iterations of the faster thread are completed between two wakeups of the slower thread; in this case it is OK to overwrite the data in the write buffer so that the slower thread gets only the latest data.

In other words, instead of a queue a double buffered solution suffices. The two buffers are allocated during initialization, and the reader and write threads can call methods of the class to get pointers to one of these buffers.

C++ code:

#include <mutex>  template <typename T> class ProducerConsumerDoubleBuffer { public:     ProducerConsumerDoubleBuffer() {         m_write_busy = false;         m_read_idx = m_write_idx = 0;     }      ~ProducerConsumerDoubleBuffer() { }      // The writer thread using this class must call     // start_writing() at the start of its iteration     // before doing anything else to get the pointer     // to the current write buffer.     T * start_writing(void) {         std::lock_guard<std::mutex> lock(m_mutex);          m_write_busy = true;         m_write_idx = 1 - m_read_idx;          return &m_buf[m_write_idx];     }     // The writer thread must call end_writing()     // as the last thing it does     // to release the write busy flag.     void end_writing(void) {         std::lock_guard<std::mutex> lock(m_mutex);          m_write_busy = false;     }      // The reader thread must call start_reading()     // at the start of its iteration to get the pointer     // to the current read buffer.     // If the write thread is not active at this time,     // the read buffer pointer will be set to the      // (previous) write buffer - so the reader gets the latest data.     // If the write buffer is busy, the read pointer is not changed.     // In this case the read buffer may contain stale data,     // it is up to the user to deal with this case.     T * start_reading(void) {         std::lock_guard<std::mutex> lock(m_mutex);          if (!m_write_busy) {             m_read_idx = m_write_idx;         }          return &m_buf[m_read_idx];     }     // The reader thread must call end_reading()     // at the end of its iteration.     void end_reading(void) {         std::lock_guard<std::mutex> lock(m_mutex);          m_read_idx = m_write_idx;     }  private:     T m_buf[2];     bool m_write_busy;     unsigned int m_read_idx, m_write_idx;     std::mutex m_mutex; }; 

To avoid stale data in the reader thread the payload structure is versioned. To facilitate bidirectional data transfer between the threads, two instances of the above monstrosity are used, in opposite directions.

Questions:

  • Is this scheme threadsafe? If it's broken, where?
  • Can it be done without the mutex? Perhaps with just memory barriers or CAS instructions?
  • Can it be made better?
like image 618
user3638506 Avatar asked May 14 '14 22:05

user3638506


People also ask

What is double buffering in C?

The double buffer is a memory area which is not directly mapped to the screen. When we want to make some change to the scene, we first modify this area, and then copy the whole area to the window. This way, a screen refresh may not take place while we are modifying the area.

What is single and double buffering?

There are two buffers in the system. One buffer is used by the driver or controller to store data while waiting for it to be taken by higher level of the hierarchy. Other buffer is used to store data from the lower level module. Double buffering is also known as buffer swapping.

What is double buffering where it is used and why?

Double buffering is a term used to describe a device with two buffers. The usage of multiple buffers increases the overall throughput of a device and helps prevents bottlenecks. For example, with graphics, double buffering can show one image or frame while a separate frame is being buffered to be shown next.


2 Answers

Very interesting problem! Way trickier than I first thought :-) I like lock-free solutions, so I've tried to work one out below.

There are many ways to think about this system. You can model it as a fixed-size circular buffer/queue (with two entries), but then you lose the ability to update the next available value for consumption, since you don't know if the consumer has started to read the most recently published value or is still (potentially) reading the previous one. So extra state is needed beyond that of a standard ring buffer in order to reach a more optimal solution.

First note that there is always a cell that the producer can safely write to at any given point in time; if one cell is being read by the consumer, the other can be written to. Let's call the cell that can be safely written to the "active" cell (the cell that can be potentially read from is whatever cell isn't the active one). The active cell can only be switched if the other cell is not currently being read from.

Unlike the active cell, which can always be written to, the non-active cell can only be read from if it contains a value; once that value is consumed, it's gone. (This means that livelock is avoided in the case of an aggressive producer; at some point, the consumer will have emptied a cell and will stop touching the cells. Once that happens, the producer can definitely publish a value, whereas before that point, it can only publish a value (change the active cell) if the consumer is not in the middle of a read.)

If there is a value that's ready to be consumed, only the consumer can change that fact (for the non-active cell, anyway); subsequent productions may change which cell is active and the published value, but a value will always be ready to be read until it's consumed.

Once the producer is done writing to the active cell, it can "publish" this value by changing which cell is the active one (swapping the index), provided the consumer is not in the middle of reading the other cell. If the consumer is in the middle of reading the other cell, the swap cannot occur, but in that case the consumer can swap after it's done reading the value, provided the producer is not in the middle of a write (and if it is, the producer will swap once it's done). In fact, in general the consumer can always swap after it's done reading (if it's the only one accessing the system) because spurious swaps by the consumer are benign: if there is something in the other cell, then swapping will cause that to be read next, and if there isn't, swapping affects nothing.

So, we need a shared variable to track what the active cell is, and we also need a way for both the producer and consumer to indicate if they're in the middle of an operation. We can store these three pieces of state into one atomic variable in order to be able to affect them all at once (atomically). We also need a way for the consumer to check if there's anything in the non-active cell in the first place, and for both threads to modify that state as appropriate. I tried a few other approaches, but in the end the easiest was just to include this information in the other atomic variable too. This makes things much simpler to reason about, since all state changes in the system are atomic this way.

I've come up with a wait-free implementation (lock-free, and all operations complete in a bounded number of instructions).

Code time!

#include <atomic> #include <cstdint>  template <typename T> class ProducerConsumerDoubleBuffer { public:     ProducerConsumerDoubleBuffer() : m_state(0) { }     ~ProducerConsumerDoubleBuffer() { }      // Never returns nullptr     T* start_writing() {         // Increment active users; once we do this, no one         // can swap the active cell on us until we're done         auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);         return &m_buf[state & 1];     }      void end_writing() {         // We want to swap the active cell, but only if we were the last         // ones concurrently accessing the data (otherwise the consumer         // will do it for us when *it's* done accessing the data)          auto state = m_state.load(std::memory_order_relaxed);         std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));         state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;         if ((state & 0x6) == 0) {             // The consumer wasn't in the middle of a read, we should             // swap (unless the consumer has since started a read or             // already swapped or read a value and is about to swap).             // If we swap, we also want to clear the full flag on what             // will become the active cell, otherwise the consumer could             // eventually read two values out of order (it reads a new             // value, then swaps and reads the old value while the             // producer is idle).             m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);         }     }      // Returns nullptr if there appears to be no more data to read yet     T* start_reading() {         m_readState = m_state.load(std::memory_order_relaxed);         if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {             // Nothing to read here!             return nullptr;         }          // At this point, there is guaranteed to be something to         // read, because the full flag is never turned off by the         // producer thread once it's on; the only thing that could         // happen is that the active cell changes, but that can         // only happen after the producer wrote a value into it,         // in which case there's still a value to read, just in a         // different cell.          m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;          // Now that we've incremented the user count, nobody can swap until         // we decrement it         return &m_buf[(m_readState & 1) ^ 1];     }      void end_reading() {         if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {             // There was nothing to read; shame to repeat this             // check, but if these functions are inlined it might             // not matter. Otherwise the API could be changed.             // Or just don't call this method if start_reading()             // returns nullptr -- then you could also get rid             // of m_readState.             return;         }          // Alright, at this point the active cell cannot change on         // us, but the active cell's flag could change and the user         // count could change. We want to release our user count         // and remove the flag on the value we read.          auto state = m_state.load(std::memory_order_relaxed);         std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;         state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;         if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {             // Oi, we were the last ones accessing the data when we released our cell.             // That means we should swap, but only if the producer isn't in the middle             // of producing something, and hasn't already swapped, and hasn't already             // set the flag we just reset (which would mean they swapped an even number             // of times).  Note that we don't bother swapping if there's nothing to read             // in the other cell.             m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);         }     }  private:     T m_buf[2];      // The bottom (lowest) bit will be the active cell (the one for writing).     // The active cell can only be switched if there's at most one concurrent     // user. The next two bits of state will be the number of concurrent users.     // The fourth bit indicates if there's a value available for reading     // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].     std::atomic<std::uint32_t> m_state;      std::uint32_t m_readState; }; 

Note that the semantics are such that the consumer can never read a given value twice, and a value it does read is always newer than the last value it read. It's also fairly efficient in memory usage (two buffers, like your original solution). I avoided CAS loops because they're generally less efficient than a single atomic operation under contention.

If you decide use the above code, I suggest you write some comprehensive (threaded) unit tests for it first. And proper benchmarks. I did test it, but only just barely. Let me know if you find any bugs :-)

My unit test:

ProducerConsumerDoubleBuffer<int> buf; std::thread producer([&]() {     for (int i = 0; i != 500000; ++i) {         int* item = buf.start_writing();         if (item != nullptr) {      // Always true             *item = i;         }         buf.end_writing();     } }); std::thread consumer([&]() {     int prev = -1;     for (int i = 0; i != 500000; ++i) {         int* item = buf.start_reading();         if (item != nullptr) {             assert(*item > prev);             prev = *item;         }         buf.end_reading();     } }); producer.join(); consumer.join(); 

As for your original implementation, I only looked at it cursorily (it's much more fun to design new stuff, heh), but david.pfx's answer seems to address that part of your question.

like image 132
Cameron Avatar answered Sep 29 '22 13:09

Cameron


Yes, I think it's broken.

If the reader does a start/end/start in succession it will update its read index to the write index, and potentially read data from the write index, even if the write is busy.

The problem essentially is that the writer doesn't know which buffer the reader will use, so the writer should ensure that both buffers are valid at all times. It can't do that, if it's going to take any time to write data into a buffer [unless I misunderstood some of the logic that isn't shown here.]

Yes, I think it can be done without locks, using CAS or equivalent logic. I'm not going to try to express an algorithm in this space. I'm confident that it exists, but not that I can write it out correctly first time. And a bit of web searching turned up some plausible candidates. Wait-free IPC using CAS appears to be quite an interesting topic and the subject of some research.


After some further thought, the algorithm is as follows. You need:

  • 3 buffers: one for the writer, one for the reader to use, and one extra. The buffers are ordered: they form a ring (but see note).
  • A status for each buffer: free, full, writing, reading.
  • A function that can inspect the status of the buffer and conditionally change the status to a different value in a single atomic operation. I shall use CSET for that.

Writer:

Find the first buffer that is FREE or FULL   Fail: assert (should never fail, reader can only use one buffer)   CSET buffer to WRITING Write into the buffer CSET buffer to FULL 

Reader:

Find first buffer that is FULL     Fail: wait (writer may be slow)     CSET buffer to READING Read and consume buffer CSET buffer to FREE 

Note: This algorithm does not guarantee that buffers are treated strictly in order of arrival, and no simple change will make it do so. If this is important, the algorithm should be enhanced with a sequence number on the buffer, set by the writer so that the most recent buffer can be chosen by the reader.

I leave the code as an implementation detail.


The CSET function is non-trivial. It has to atomically test that a particular shared memory location is equal to an expected value and if so change it to a new value. It returns true if it successfully made the change and false otherwise. The implementation must avoid race conditions if two threads access the same location at the same time (and possibly on different processors).

The C++ standard atomic operations library contains a set of atomic_compare_exchange functions which should serve the purpose, if available.

like image 45
david.pfx Avatar answered Sep 29 '22 11:09

david.pfx