Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

An attempt to create atomic reference counting is failing with deadlock. Is this the right approach?

So I'm attempting to create copy-on-write map that uses an attempt at atomic reference counting on the read-side to not have locking.

Something isn't quite right. I see some references getting over-incremented and some are going down negative, so something isn't really atomic. In my tests I have 10 reader threads looping 100 times each doing a get() and 1 writer thread doing 100 writes.

It gets stuck in the writer because some of the references never go down to zero, even though they should.

I'm attempting to use the 128-bit DCAS technique laid explained by this blog.

Is there something blatantly wrong with this or is there an easier way to debugging this rather than playing with it in the debugger?

typedef std::unordered_map<std::string, std::string> StringMap;

static const int zero = 0;  //provides an l-value for asm code

class NonBlockingReadMapCAS {

public:

    class OctaWordMapWrapper {
    public:
        StringMap* fStringMap;
        //std::atomic<int> fCounter;
        int64_t fCounter;

        OctaWordMapWrapper(OctaWordMapWrapper* copy) : fStringMap(new StringMap(*copy->fStringMap)), fCounter(0) { }

        OctaWordMapWrapper() : fStringMap(new StringMap), fCounter(0) { }

        ~OctaWordMapWrapper() {
            delete fStringMap;
        }

        /**
         * Does a compare and swap on an octa-word - in this case, our two adjacent class members fStringMap 
         * pointer and fCounter.
         */
        static bool inline doubleCAS(OctaWordMapWrapper* target, StringMap* compareMap, int64_t compareCounter, StringMap* swapMap, int64_t swapCounter ) {
            bool cas_result;
            __asm__ __volatile__
            (
             "lock cmpxchg16b %0;"    // cmpxchg16b sets ZF on success
             "setz       %3;"         // if ZF set, set cas_result to 1

             : "+m" (*target),
               "+a" (compareMap),     //compare target's stringmap pointer to compareMap
               "+d" (compareCounter), //compare target's counter to compareCounter
               "=q" (cas_result)      //results
             : "b"  (swapMap),        //swap target's stringmap pointer with swapMap
               "c"  (swapCounter)     //swap target's counter with swapCounter
             : "cc", "memory"
             );
            return cas_result;
        }



    OctaWordMapWrapper* atomicIncrementAndGetPointer()
    {

        if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1))
            return this;
        else
            return NULL;
    }


        OctaWordMapWrapper* atomicDecrement()
        {
            while(true) {
                if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter -1))
                    break;
            }
            return this;
        }

        bool atomicSwapWhenNotReferenced(StringMap* newMap)
        {
            return doubleCAS(this, this->fStringMap, zero, newMap, 0);
        }
    }
    __attribute__((aligned(16)));

    std::atomic<OctaWordMapWrapper*> fReadMapReference;
    pthread_mutex_t fMutex;


    NonBlockingReadMapCAS()  {
        fReadMapReference = new OctaWordMapWrapper();
    }

    ~NonBlockingReadMapCAS() {
       delete fReadMapReference;
    }

    bool contains(const char* key) {
        std::string keyStr(key);
        return contains(keyStr);
    }

    bool contains(std::string &key) {
        OctaWordMapWrapper *map;
        do {
            map = fReadMapReference.load()->atomicIncrementAndGetPointer();
        } while (!map);
        bool result = map->fStringMap->count(key) != 0;
        map->atomicDecrement();
        return result;
    }

    std::string get(const char* key) {
        std::string keyStr(key);
        return get(keyStr);
    }

    std::string get(std::string &key) {
        OctaWordMapWrapper *map;
        do {
            map = fReadMapReference.load()->atomicIncrementAndGetPointer();
        } while (!map);
        //std::cout << "inc " << map->fStringMap << " cnt " << map->fCounter << "\n";
        std::string value = map->fStringMap->at(key);
        map->atomicDecrement();
        return value;
    }

    void put(const char* key, const char* value) {
        std::string keyStr(key);
        std::string valueStr(value);
        put(keyStr, valueStr);
    }

    void put(std::string &key, std::string &value) {
        pthread_mutex_lock(&fMutex);
        OctaWordMapWrapper *oldWrapper = fReadMapReference;
        OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
        std::pair<std::string, std::string> kvPair(key, value);
        newWrapper->fStringMap->insert(kvPair);
        fReadMapReference.store(newWrapper);
        std::cout << oldWrapper->fCounter << "\n";
        while (oldWrapper->fCounter > 0);
        delete oldWrapper;
        pthread_mutex_unlock(&fMutex);

    }

    void clear() {
        pthread_mutex_lock(&fMutex);
        OctaWordMapWrapper *oldWrapper = fReadMapReference;
        OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
        fReadMapReference.store(newWrapper);
        while (oldWrapper->fCounter > 0);
        delete oldWrapper;
        pthread_mutex_unlock(&fMutex);

    }

};
like image 580
marathon Avatar asked Jul 17 '14 01:07

marathon


3 Answers

Maybe not the answer but this looks suspicious to me:

while (oldWrapper->fCounter > 0);
delete oldWrapper;

You could have a reader thread just entering atomicIncrementAndGetPointer() when the counter is 0 thus pulling the rug underneath the reader thread by deleting the wrapper.

Edit to sum up the comments below for potential solution:

The best implementation I'm aware of is to move fCounter from OctaWordMapWrapper to fReadMapReference (You don't need the OctaWordMapWrapper class at all actually). When the counter is zero swap the pointer in your writer. Because you can have high contention of reader threads which essentially blocks the writer indefinitely you can have highest bit of fCounter allocated for reader lock, i.e. while this bit is set the readers spin until the bit is cleared. The writer sets this bit (__sync_fetch_and_or()) when it's about to change the pointer, waits for the counter to fall down to zero (i.e. existing readers finish their work) and then swap the pointer and clears the bit.

This approach should be waterproof, though it's obviously blocking readers upon writes. I don't know if this is acceptable in your situation and ideally you would like this to be non-blocking.

The code would look something like this (not tested!):

class NonBlockingReadMapCAS
{
public:
  NonBlockingReadMapCAS() :m_ptr(0), m_counter(0) {}

private:
  StringMap *acquire_read()
  {
    while(1)
    {
      uint32_t counter=atom_inc(m_counter);
      if(!(counter&0x80000000))
        return m_ptr;
      atom_dec(m_counter);
      while(m_counter&0x80000000);
    }
    return 0;
  }

  void release_read()
  {
    atom_dec(m_counter);
  }

  void acquire_write()
  {
    uint32_t counter=atom_or(m_counter, 0x80000000);
    assert(!(counter&0x80000000));
    while(m_counter&0x7fffffff);
  }

  void release_write()
  {
    atom_and(m_counter, uint32_t(0x7fffffff));
  }

  StringMap *volatile m_ptr;
  volatile uint32_t m_counter;
};

Just call acquire/release_read/write() before & after accessing the pointer for read/write. Replace atom_inc/dec/or/and() with __sync_fetch_and_add(), __sync_fetch_and_sub(), __sync_fetch_and_or() and __sync_fetch_and_and() respectively. You don't need doubleCAS() for this actually.

As noted correctly by @Quuxplusone in a comment below this is single producer & multiple consumer implementation. I modified the code to assert properly to enforce this.

like image 75
JarkkoL Avatar answered Nov 08 '22 17:11

JarkkoL


Well, there are probably lots of problems, but here are the obvious two.

The most trivial bug is in atomicIncrementAndGetPointer. You wrote:

if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1))

That is, you're attempting to increment this->fCounter in a lock-free way. But it doesn't work, because you're fetching the old value twice with no guarantee that the same value is read each time. Consider the following sequence of events:

  • Thread A fetches this->fCounter (with value 0) and computes argument 5 as this->fCounter +1 = 1.
  • Thread B successfully increments the counter.
  • Thread A fetches this->fCounter (with value 1) and computes argument 3 as this->fCounter = 1.
  • Thread A executes doubleCAS(this, this->fStringMap, 1, this->fStringMap, 1). It succeeds, of course, but we've lost the "increment" we were trying to do.

What you wanted is more like

StringMap* oldMap = this->fStringMap;
int64_t oldCounter = this->fCounter;
if (doubleCAS(this, oldMap, oldValue, oldMap, oldValue+1))
    ...

The other obvious problem is that there's a data race between get and put. Consider the following sequence of events:

  • Thread A begins to execute get: it fetches fReadMapReference.load() and prepares to execute atomicIncrementAndGetPointer on that memory address.
  • Thread B finishes executing put: it deletes that memory address. (It is within its rights to do so, because the wrapper's reference count is still at zero.)
  • Thread A starts executing atomicIncrementAndGetPointer on the deleted memory address. If you're lucky, you segfault, but of course in practice you probably won't.

As explained in the blog post:

The garbage collection interface is omitted, but in real applications you would need to scan the hazard pointers before deleting a node.

like image 38
Quuxplusone Avatar answered Nov 08 '22 18:11

Quuxplusone


Another user has suggested a similar approach, but if you are compiling with gcc (and perhaps with clang), you could use the intrinsic __sync_add_and_fetch_4 which does something similar to what your assembly code does, and is likely much more portable. I have used it when I implemented refcounting in an Ada library (but the algorithm remains the same).

int __sync_add_and_fetch_4 (int* ptr, int value);
// increments the value pointed to by ptr by value, and returns the new value
like image 34
manuBriot Avatar answered Nov 08 '22 17:11

manuBriot