So I'm attempting to create copy-on-write map that uses an attempt at atomic reference counting on the read-side to not have locking.
Something isn't quite right. I see some references getting over-incremented and some are going down negative, so something isn't really atomic. In my tests I have 10 reader threads looping 100 times each doing a get() and 1 writer thread doing 100 writes.
It gets stuck in the writer because some of the references never go down to zero, even though they should.
I'm attempting to use the 128-bit DCAS technique laid explained by this blog.
Is there something blatantly wrong with this or is there an easier way to debugging this rather than playing with it in the debugger?
typedef std::unordered_map<std::string, std::string> StringMap;
static const int zero = 0; //provides an l-value for asm code
class NonBlockingReadMapCAS {
public:
class OctaWordMapWrapper {
public:
StringMap* fStringMap;
//std::atomic<int> fCounter;
int64_t fCounter;
OctaWordMapWrapper(OctaWordMapWrapper* copy) : fStringMap(new StringMap(*copy->fStringMap)), fCounter(0) { }
OctaWordMapWrapper() : fStringMap(new StringMap), fCounter(0) { }
~OctaWordMapWrapper() {
delete fStringMap;
}
/**
* Does a compare and swap on an octa-word - in this case, our two adjacent class members fStringMap
* pointer and fCounter.
*/
static bool inline doubleCAS(OctaWordMapWrapper* target, StringMap* compareMap, int64_t compareCounter, StringMap* swapMap, int64_t swapCounter ) {
bool cas_result;
__asm__ __volatile__
(
"lock cmpxchg16b %0;" // cmpxchg16b sets ZF on success
"setz %3;" // if ZF set, set cas_result to 1
: "+m" (*target),
"+a" (compareMap), //compare target's stringmap pointer to compareMap
"+d" (compareCounter), //compare target's counter to compareCounter
"=q" (cas_result) //results
: "b" (swapMap), //swap target's stringmap pointer with swapMap
"c" (swapCounter) //swap target's counter with swapCounter
: "cc", "memory"
);
return cas_result;
}
OctaWordMapWrapper* atomicIncrementAndGetPointer()
{
if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1))
return this;
else
return NULL;
}
OctaWordMapWrapper* atomicDecrement()
{
while(true) {
if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter -1))
break;
}
return this;
}
bool atomicSwapWhenNotReferenced(StringMap* newMap)
{
return doubleCAS(this, this->fStringMap, zero, newMap, 0);
}
}
__attribute__((aligned(16)));
std::atomic<OctaWordMapWrapper*> fReadMapReference;
pthread_mutex_t fMutex;
NonBlockingReadMapCAS() {
fReadMapReference = new OctaWordMapWrapper();
}
~NonBlockingReadMapCAS() {
delete fReadMapReference;
}
bool contains(const char* key) {
std::string keyStr(key);
return contains(keyStr);
}
bool contains(std::string &key) {
OctaWordMapWrapper *map;
do {
map = fReadMapReference.load()->atomicIncrementAndGetPointer();
} while (!map);
bool result = map->fStringMap->count(key) != 0;
map->atomicDecrement();
return result;
}
std::string get(const char* key) {
std::string keyStr(key);
return get(keyStr);
}
std::string get(std::string &key) {
OctaWordMapWrapper *map;
do {
map = fReadMapReference.load()->atomicIncrementAndGetPointer();
} while (!map);
//std::cout << "inc " << map->fStringMap << " cnt " << map->fCounter << "\n";
std::string value = map->fStringMap->at(key);
map->atomicDecrement();
return value;
}
void put(const char* key, const char* value) {
std::string keyStr(key);
std::string valueStr(value);
put(keyStr, valueStr);
}
void put(std::string &key, std::string &value) {
pthread_mutex_lock(&fMutex);
OctaWordMapWrapper *oldWrapper = fReadMapReference;
OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
std::pair<std::string, std::string> kvPair(key, value);
newWrapper->fStringMap->insert(kvPair);
fReadMapReference.store(newWrapper);
std::cout << oldWrapper->fCounter << "\n";
while (oldWrapper->fCounter > 0);
delete oldWrapper;
pthread_mutex_unlock(&fMutex);
}
void clear() {
pthread_mutex_lock(&fMutex);
OctaWordMapWrapper *oldWrapper = fReadMapReference;
OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
fReadMapReference.store(newWrapper);
while (oldWrapper->fCounter > 0);
delete oldWrapper;
pthread_mutex_unlock(&fMutex);
}
};
Maybe not the answer but this looks suspicious to me:
while (oldWrapper->fCounter > 0);
delete oldWrapper;
You could have a reader thread just entering atomicIncrementAndGetPointer()
when the counter is 0 thus pulling the rug underneath the reader thread by deleting the wrapper.
Edit to sum up the comments below for potential solution:
The best implementation I'm aware of is to move fCounter
from OctaWordMapWrapper
to fReadMapReference
(You don't need the OctaWordMapWrapper
class at all actually). When the counter is zero swap the pointer in your writer. Because you can have high contention of reader threads which essentially blocks the writer indefinitely you can have highest bit of fCounter
allocated for reader lock, i.e. while this bit is set the readers spin until the bit is cleared. The writer sets this bit (__sync_fetch_and_or()
) when it's about to change the pointer, waits for the counter to fall down to zero (i.e. existing readers finish their work) and then swap the pointer and clears the bit.
This approach should be waterproof, though it's obviously blocking readers upon writes. I don't know if this is acceptable in your situation and ideally you would like this to be non-blocking.
The code would look something like this (not tested!):
class NonBlockingReadMapCAS
{
public:
NonBlockingReadMapCAS() :m_ptr(0), m_counter(0) {}
private:
StringMap *acquire_read()
{
while(1)
{
uint32_t counter=atom_inc(m_counter);
if(!(counter&0x80000000))
return m_ptr;
atom_dec(m_counter);
while(m_counter&0x80000000);
}
return 0;
}
void release_read()
{
atom_dec(m_counter);
}
void acquire_write()
{
uint32_t counter=atom_or(m_counter, 0x80000000);
assert(!(counter&0x80000000));
while(m_counter&0x7fffffff);
}
void release_write()
{
atom_and(m_counter, uint32_t(0x7fffffff));
}
StringMap *volatile m_ptr;
volatile uint32_t m_counter;
};
Just call acquire/release_read/write() before & after accessing the pointer for read/write. Replace atom_inc/dec/or/and()
with __sync_fetch_and_add()
, __sync_fetch_and_sub()
, __sync_fetch_and_or()
and __sync_fetch_and_and()
respectively. You don't need doubleCAS()
for this actually.
As noted correctly by @Quuxplusone in a comment below this is single producer & multiple consumer implementation. I modified the code to assert properly to enforce this.
Well, there are probably lots of problems, but here are the obvious two.
The most trivial bug is in atomicIncrementAndGetPointer
. You wrote:
if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1))
That is, you're attempting to increment this->fCounter
in a lock-free way. But it doesn't work, because you're fetching the old value twice with no guarantee that the same value is read each time. Consider the following sequence of events:
this->fCounter
(with value 0) and computes argument 5 as this->fCounter +1
= 1
.this->fCounter
(with value 1) and computes argument 3 as this->fCounter
= 1
.doubleCAS(this, this->fStringMap, 1, this->fStringMap, 1)
. It succeeds, of course, but we've lost the "increment" we were trying to do.What you wanted is more like
StringMap* oldMap = this->fStringMap;
int64_t oldCounter = this->fCounter;
if (doubleCAS(this, oldMap, oldValue, oldMap, oldValue+1))
...
The other obvious problem is that there's a data race between get
and put
. Consider the following sequence of events:
get
: it fetches fReadMapReference.load()
and prepares to execute atomicIncrementAndGetPointer
on that memory address.put
: it deletes that memory address. (It is within its rights to do so, because the wrapper's reference count is still at zero.)atomicIncrementAndGetPointer
on the deleted memory address. If you're lucky, you segfault, but of course in practice you probably won't.As explained in the blog post:
The garbage collection interface is omitted, but in real applications you would need to scan the hazard pointers before deleting a node.
Another user has suggested a similar approach, but if you are compiling with gcc (and perhaps with clang), you could use the intrinsic __sync_add_and_fetch_4 which does something similar to what your assembly code does, and is likely much more portable. I have used it when I implemented refcounting in an Ada library (but the algorithm remains the same).
int __sync_add_and_fetch_4 (int* ptr, int value);
// increments the value pointed to by ptr by value, and returns the new value
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With