For signal processing on big arrays (10^7 elements), I use different threads connected with ring buffers. Sadly, too much time is just needed for copying the data to and out of the buffer. The current implementation is based on boost::lockfree::spsc_queue
.
So I'm am searching for a solution to swap the ownership of the vectors between the threads and the buffer by using unique_ptr to the vectors (please see drawing attached: swapping pointer between threads and the queue).
Moving smart pointers doesn't fit my needs, because therefore I need to allocate memory during runtime constantly for new vector elements. That overhead is bigger than copying the data around.
Am I missing a flaw in that design?
Are there thread-safe or even lock-free ring buffer implementations allowing swap operations for push and pop?
Edit: I modified a locking ring buffer to swap unique_ptr
. The performance boost is huge. Though it doesn't feel like a elegant solution. Any recommendations?
// https://github.com/embeddedartistry/embedded-resources/blob/master/examples/cpp/circular_buffer.cpp
#include <memory>
#include <mutex>
template <typename T, int SIZE>
class RingbufferPointer {
typedef std::unique_ptr<T> TPointer;
public:
explicit RingbufferPointer() {
// create objects
for (int i=0; i<SIZE; i++) {
buf_[i] = std::make_unique<T>();
}
}
bool push(TPointer &item) {
std::lock_guard<std::mutex> lock(mutex_);
if (full())
return false;
std::swap(buf_[head_], item);
if (full_)
tail_ = (tail_ + 1) % max_size_;
head_ = (head_ + 1) % max_size_;
full_ = head_ == tail_;
return true;
}
bool pop(TPointer &item) {
std::lock_guard<std::mutex> lock(mutex_);
if (empty())
return false;
std::swap(buf_[tail_], item);
full_ = false;
tail_ = (tail_ + 1) % max_size_;
return true;
}
void reset() {
std::lock_guard<std::mutex> lock(mutex_);
head_ = tail_;
full_ = false;
}
bool empty() const {
return (!full_ && (head_ == tail_));
}
bool full() const {
return full_;
}
int capacity() const {
return max_size_;
}
int size() const {
int size = max_size_;
if(!full_) {
if(head_ >= tail_)
size = head_ - tail_;
else
size = max_size_ + head_ - tail_;
}
return size;
}
private:
TPointer buf_[SIZE];
std::mutex mutex_;
int head_ = 0;
int tail_ = 0;
const int max_size_ = SIZE;
bool full_ = 0;
};
Moving smart pointers doesn't fit my needs, because therefore I need to allocate memory during runtime constantly for new vector elements.
Not necessarily true if you pre-allocate enough storage and implement your own memory management a la simple segregated storage, a.k.a pooling.
If you do that, there's nothing keeping you from swapping around and you get to keep your existing architecture using any ring-buffer that supports swapping of elements and remain with the same thread-safety you had before. You can check the option of just using boost::pool
instead of implementing your own.
if i correct understand your task - you need 2 containers:
with this you can do next:
N
elements and push it to pool.FIFO queue can implement in next way:
class CyclicBufer
{
struct alignas(8) Position
{
ULONG _begin, _data_size;
};
std::atomic<Position> _pos;
void** _items;
ULONG _buf_size;
public:
// Requires: only one thread is allowed to push data to the CyclicBufer
bool push(void* item, bool* bWasEmpty = 0);
// Requires: only one thread is allowed to pop data to the CyclicBufer
bool pop(void** pitem, bool* bNotEmpty = 0);
~CyclicBufer()
{
if (_items)
{
delete [] _items;
}
}
CyclicBufer() : _items(0), _buf_size(0)
{
_pos._My_val._begin = 0, _pos._My_val._data_size = 0;
}
bool create(ULONG buf_size)
{
if (_items = new(std::nothrow) void*[buf_size])
{
_buf_size = buf_size;
return true;
}
return false;
}
bool is_empty()
{
Position current_pos = _pos.load(std::memory_order_relaxed);
return !current_pos._data_size;
}
};
bool CyclicBufer::push(void* item, bool* bWasEmpty /*= 0*/)
{
Position current_pos = _pos.load(std::memory_order_relaxed);
if (current_pos._data_size >= _buf_size) return false;
// (_pos._begin + _pos._data_size) % _buf_size never changed in pop
_items[(current_pos._begin + current_pos._data_size) % _buf_size] = item;
for (;;)
{
Position new_pos = {
current_pos._begin, current_pos._data_size + 1
};
if (_pos.compare_exchange_weak(current_pos, new_pos, std::memory_order_release))
{
if (bWasEmpty) *bWasEmpty = current_pos._data_size == 0;
return true;
}
}
}
bool CyclicBufer::pop(void** pitem, bool* bNotEmpty /*= 0*/)
{
Position current_pos = _pos.load(std::memory_order_acquire);
if (!current_pos._data_size) return false;
// current_pos._begin never changed in push
void* item = _items[current_pos._begin];
for (;;)
{
Position new_pos = {
(current_pos._begin + 1) % _buf_size, current_pos._data_size - 1
};
if (_pos.compare_exchange_weak(current_pos, new_pos, std::memory_order_relaxed))
{
if (bNotEmpty) *bNotEmpty = new_pos._data_size != 0;
*pitem = item;
return true;
}
}
}
for Thread-safe and lock-free pool implementation on windows can be used InterlockedPushEntrySList
and InterlockedPopEntrySList
, but of course possible implement this api and yourself:
struct list_entry {
list_entry *Next;
};
#if defined(_M_X64) || defined(_M_ARM64)
#define MACHINE_64
#endif
struct alignas(sizeof(PVOID)*2) list_head
{
union {
struct {
INT_PTR DepthAndSequence;
union {
list_entry* NextEntry;
INT_PTR iNextEntry;
};
};
__int64 value; // for 32-bit only
};
void init()
{
iNextEntry = 0, DepthAndSequence = 0;
}
bool push(list_entry* entry)
{
list_head current = { { DepthAndSequence, NextEntry } }, new_head;
for (;;)
{
entry->Next = current.NextEntry;
new_head.NextEntry = entry;
new_head.DepthAndSequence = current.DepthAndSequence + 0x10001;
#ifdef MACHINE_64
if (_INTRIN_RELEASE(_InterlockedCompareExchange128)(
&DepthAndSequence,
new_head.iNextEntry, new_head.DepthAndSequence,
¤t.DepthAndSequence))
{
// return is list was empty before push
return !current.NextEntry;
}
#else
new_head.value = _INTRIN_RELEASE(_InterlockedCompareExchange64)(
&value, new_head.value, current.value);
if (new_head.value == current.value)
{
// return is list was empty before push
return !current.NextEntry;
}
current.value = new_head.value;
#endif
}
}
list_entry* pop()
{
list_head current = { { DepthAndSequence, NextEntry } }, new_head;
for (;;)
{
list_entry* entry = current.NextEntry;
if (!entry)
{
return 0;
}
// entry must be valid memory
new_head.NextEntry = entry->Next;
new_head.DepthAndSequence = current.DepthAndSequence - 1;
#ifdef MACHINE_64
if (_INTRIN_ACQUIRE(_InterlockedCompareExchange128)(&DepthAndSequence,
new_head.iNextEntry, new_head.DepthAndSequence,
¤t.DepthAndSequence))
{
return entry;
}
#else
new_head.value = _INTRIN_ACQUIRE(_InterlockedCompareExchange64)(
&value, new_head.value, current.value);
if (new_head.value == current.value)
{
return entry;
}
current.value = new_head.value;
#endif
}
}
};
#pragma warning(disable : 4324)
template <class _Ty>
class FreeItems : list_head
{
void* _items;
union Chunk {
list_entry entry;
char buf[sizeof(_Ty)];
};
public:
~FreeItems()
{
if (_items)
{
delete [] _items;
}
}
FreeItems() : _items(0)
{
init();
}
bool create(ULONG count)
{
if (Chunk* items = new(std::nothrow) Chunk[count])
{
_items = items;
union {
list_entry* entry;
Chunk* item;
};
item = items;
do
{
list_head::push(entry);
} while (item++, --count);
return true;
}
return false;
}
_Ty* pop()
{
return (_Ty*)list_head::pop();
}
bool push(_Ty* item)
{
return list_head::push((list_entry*)item);
}
};
with this 2 containers demo/test code can look like (code for windows, but main - how we using pool and queue)
struct BigData
{
ULONG _id;
};
struct CPData : CyclicBufer, FreeItems<BigData>
{
HANDLE _hDataEvent, _hFreeEvent, _hConsumerStop, _hProducerStop;
ULONG _waitReadId, _writeId, _setFreeCount, _setDataCount;
std::_Atomic_integral_t _dwRefCount;
bool _bStop;
static ULONG WINAPI sProducer(void* This)
{
reinterpret_cast<CPData*>(This)->Producer();
reinterpret_cast<CPData*>(This)->Release();
return __LINE__;
}
void Producer()
{
HANDLE Handles[] = { _hProducerStop, _hFreeEvent };
for (;;)
{
BigData* item;
while (!_bStop && (item = FreeItems::pop()))
{
// init data item
item->_id = _writeId++;
bool bWasEmpty;
if (!CyclicBufer::push(item, &bWasEmpty)) __debugbreak();
if (bWasEmpty)
{
_setDataCount++;
SetEvent(_hDataEvent);
}
}
switch (WaitForMultipleObjects(2, Handles, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
SetEvent(_hConsumerStop);
return;
case WAIT_OBJECT_0 + 1:
break;
default:
__debugbreak();
}
}
}
static ULONG WINAPI sConsumer(void* This)
{
reinterpret_cast<CPData*>(This)->Consumer();
reinterpret_cast<CPData*>(This)->Release();
return __LINE__;
}
void Consumer()
{
HANDLE Handles[] = { _hDataEvent, _hConsumerStop };
for (;;)
{
switch (WaitForMultipleObjects(2, Handles, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
break;
case WAIT_OBJECT_0 + 1:
return;
default:
__debugbreak();
}
bool bNotEmpty;
do
{
BigData* item;
if (!CyclicBufer::pop((void**)&item, &bNotEmpty)) __debugbreak();
// check FIFO order
if (item->_id != _waitReadId) __debugbreak();
_waitReadId++;
// process item
// free item to the pool
if (FreeItems::push(item))
{
// stack was empty
_setFreeCount++;
SetEvent(_hFreeEvent);
}
} while (bNotEmpty);
}
}
~CPData()
{
if (_hConsumerStop) CloseHandle(_hConsumerStop);
if (_hProducerStop) CloseHandle(_hProducerStop);
if (_hFreeEvent) CloseHandle(_hFreeEvent);
if (_hDataEvent) CloseHandle(_hDataEvent);
if (_waitReadId != _writeId || !CyclicBufer::is_empty()) __debugbreak();
DbgPrint("%s(%u %u %u)\n", __FUNCTION__, _writeId, _setFreeCount, _setDataCount);
}
public:
CPData()
{
_hFreeEvent = 0, _hDataEvent = 0, _hProducerStop = 0, _hConsumerStop = 0;
_waitReadId = 0, _writeId = 0, _dwRefCount = 1;
_setFreeCount = 0, _setDataCount = 0, _bStop = false;
}
void AddRef()
{
_MT_INCR(_dwRefCount);
}
void Release()
{
if (!_MT_DECR(_dwRefCount))
{
delete this;
}
}
ULONG Create(ULONG n)
{
if (!CyclicBufer::create(n) || !FreeItems::create(n))
{
return ERROR_NO_SYSTEM_RESOURCES;
}
return (_hDataEvent = CreateEvent(0, FALSE, FALSE, 0)) &&
(_hFreeEvent = CreateEvent(0, FALSE, FALSE, 0)) &&
(_hProducerStop = CreateEvent(0, TRUE, FALSE, 0)) &&
(_hConsumerStop = CreateEvent(0, TRUE, FALSE, 0)) ? 0 : GetLastError();
}
ULONG StartThread(bool bConsumer)
{
AddRef();
if (HANDLE hThread = CreateThread(0, 0, bConsumer ? sConsumer : sProducer, this, 0, 0))
{
CloseHandle(hThread);
return 0;
}
Release();
return GetLastError();
}
ULONG Stop()
{
ULONG err = SetEvent(_hProducerStop) ? 0 : GetLastError();
_bStop = true;
return err;
}
};
void BufTest()
{
if (CPData* p = new CPData)
{
if (!p->Create(16))
{
if (!p->StartThread(false))
{
p->StartThread(true);
}
MessageBoxW(0, 0, L"Wait Stop", MB_ICONINFORMATION);
p->Stop();
}
p->Release();
}
MessageBoxW(0,0,0,1);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With