I have a class that stores the latest value of some incoming realtime data (around 150 million events/second).
Suppose it looks like this:
class DataState
{
Event latest_event;
public:
//pushes event atomically
void push_event(const Event __restrict__* e);
//pulls event atomically
Event pull_event();
};
I need to be able to push events atomically and pull them with strict ordering guarantees. Now, I know I can use a spinlock, but given the massive event rate (over 100 million/second) and high degree of concurrency I'd prefer to use lockfree operations.
The problem is that Event
is 64 bytes in size. There is no CMPXCHG64B
instruction on any current X86 CPU (as of August '16). So if I use std::atomic<Event>
I'd have to link to libatomic
which uses mutexes under the hood (too slow).
So my solution was to instead atomically swap pointers to the value. Problem is dynamic memory allocation becomes a bottleneck with these event rates. So... I define something I call a "ring allocator":
/// @brief Lockfree Static short-lived allocator used for a ringbuffer
/// Elements are guaranteed to persist only for "size" calls to get_next()
template<typename T> class RingAllocator {
T *arena;
std::atomic_size_t arena_idx;
const std::size_t arena_size;
public:
/// @brief Creates a new RingAllocator
/// @param size The number of elements in the underlying arena. Make this large enough to avoid overwriting fresh data
RingAllocator<T>(std::size_t size) : arena_size(size)
{
//allocate pool
arena = new T[size];
//zero out pool
std::memset(arena, 0, sizeof(T) * size);
arena_idx = 0;
}
~RingAllocator()
{
delete[] arena;
}
/// @brief Return next element's pointer. Thread-safe
/// @return pointer to next available element
T *get_next()
{
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
}
};
Then I could have my DataState class look like this:
class DataState
{
std::atomic<Event*> latest_event;
RingAllocator<Event> event_allocator;
public:
//pushes event atomically
void push_event(const Event __restrict__* e)
{
//store event
Event *new_ptr = event_allocator.get_next()
*new_ptr = *e;
//swap event pointers
latest_event.store(new_ptr, std::memory_order_release);
}
//pulls event atomically
Event pull_event()
{
return *(latest_event.load(std::memory_order_acquire));
}
};
As long as I size my ring allocator to the max # of threads that may concurrently call the functions, there's no risk of overwriting data that pull_event could return. Plus everything's super localized so indirection won't cause bad cache performance. Any possible pitfalls with this approach?
DataState
class:I thought it was going to be a stack or queue, but it isn't, so push
/ pull
don't seem like good names for methods. (Or else the implementation is totally bogus).
It's just a latch that lets you read the last event that any thread stored.
There's nothing to stop two writes in a row from overwriting an element that's never been read. There's also nothing to stop you reading the same element twice.
If you just need somewhere to copy small blocks of data, a ring buffer does seem like a decent approach. But if you don't want to lose events, I don't think you can use it this way. Instead, just get a ring buffer entry, then copy to it and use it there. So the only atomic operation should be incrementing the ring buffer position index.
You can make get_next()
much more efficient. This line does an atomic post-increment (fetch_add) and an atomic exchange:
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
I'm not even sure it's safe, because the xchg can maybe step on the fetch_add from another thread. Anyway, even if it's safe, it's not ideal.
You don't need that. Make sure the arena_size is always a power of 2, then you don't need to modulo the shared counter. You can just let it go, and have every thread modulo it for their own use. It will eventually wrap, but it's a binary integer so it will wrap at a power of 2, which is a multiple of your arena size.
I'd suggest storing an AND-mask instead of a size, so there's no risk of the %
compiling to anything other than an and
instruction, even if it's not a compile-time constant. This makes sure we avoid a 64-bit integer div
instruction.
template<typename T> class RingAllocator {
T *arena;
std::atomic_size_t arena_idx;
const std::size_t size_mask; // maybe even make this a template parameter?
public:
RingAllocator<T>(std::size_t size)
: arena_idx(0), size_mask(size-1)
{
// verify that size is actually a power of two, so the mask is all-ones in the low bits, and all-zeros in the high bits.
// so that i % size == i & size_mask for all i
...
}
...
T *get_next() {
size_t idx = arena_idx.fetch_add(1, std::memory_order_relaxed); // still atomic, but we don't care which order different threads take blocks in
idx &= size_mask; // modulo our local copy of the idx
return &arena[idx];
}
};
Allocating the arena would be more efficient if you used calloc
instead of new + memset. The OS already zeros pages before giving them to user-space processes (to prevent information leakage), so writing them all is just wasted work.
arena = new T[size];
std::memset(arena, 0, sizeof(T) * size);
// vs.
arena = (T*)calloc(size, sizeof(T));
Writing the pages yourself does fault them in, so they're all wired to real physical pages, instead of just copy-on-write mappings for a system-wide shared physical zero page (like they are after new/malloc/calloc). On a NUMA system, the physical page chosen might depend on which thread actually touched the page, rather than which thread did the allocation. But since you're reusing the pool, the first core to write a page might not be the one that ends up using it most.
Maybe something to look for in microbenchmarks / perf counters.
As long as I size my ring allocator to the max # of threads that may concurrently call the functions, there's no risk of overwriting data that pull_event could return. .... Any possible pitfalls with this approach?
The pitfall is, IIUC, that your statement is wrong.
If I have just 2 threads, and 10 elements in the ring buffer, the first thread could call pull_event once, and be "mid-pulling", and then the second thread could call push 10 times, overwriting what thread 1 is pulling.
Again, assuming I understand your code correctly.
Also, as mentioned above,
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
that arena_idx++
inside the exchange on the same variable, just looks wrong. And in fact is wrong. Two threads could increment it - ThreadA increments to 8 and threadB increments to 9, and then threadB exchanges it to 9, then threadA exchanges it to 8. whoops.
atomic(op1) @ atomic(op2) != atomic(op1 @ op2)
I worry about what else is wrong in code not shown. I don't mean that as an insult - lock-free is just not easy.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With