Is there any C++ implementation (source codes) of "optmistic approach to lock-free FIFO queues" algorithm?
How about this lfqueue
This is cross-platform, unlimited enqueue thread safety queue, have been tested multi deq, multi enq-deq and multi enq. Guarantee memory safe.
For example
int* int_data;
lfqueue_t my_queue;
if (lfqueue_init(&my_queue) == -1)
return -1;
/** Wrap This scope in other threads **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&my_queue, int_data) == -1) {
printf("ENQ Full ?\n");
}
/** Wrap This scope in other threads **/
/*Dequeue*/
while ( (int_data = lfqueue_deq(&my_queue)) == NULL) {
printf("DEQ EMPTY ..\n");
}
// printf("%d\n", *(int*) int_data );
free(int_data);
/** End **/
lfqueue_destroy(&my_queue);
I want to conclude the answer given by greyfade, which is based on http://www.drdobbs.com/high-performance-computing/212201163 (the last part of the article), the optimized code would be (with some modification to suit my naming and coding convention) : `
template <typename T> class LFQueue {
private:
struct LFQNode {
LFQNode( T* val ) : value(val), next(nullptr) { }
T* value;
AtomicPtr<LFQNode> next;
char pad[CACHE_LINE_SIZE - sizeof(T*) - sizeof(AtomicPtr<LFQNode>)];
};
char pad0[CACHE_LINE_SIZE];
LFQNode* first; // for one consumer at a time
char pad1[CACHE_LINE_SIZE - sizeof(LFQNode*)];
InterlockedFlag consumerLock; // shared among consumers
char pad2[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
LFQNode* last; // for one producer at a time
char pad3[CACHE_LINE_SIZE - sizeof(LFQNode*)];
InterlockedFlag producerLock; // shared among producers
char pad4[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
public:
LFQueue() {
first = last = new LFQNode( nullptr ); // no more divider
producerLock = consumerLock = false;
}
~LFQueue() {
while( first != nullptr ) {
LFQNode* tmp = first;
first = tmp->next;
delete tmp;
}
}
bool pop( T& result ) {
while( consumerLock.set(true) )
{ } // acquire exclusivity
if( first->next != nullptr ) { // if queue is nonempty
LFQNode* oldFirst = first;
first = first->next;
T* value = first->value; // take it out
first->value = nullptr; // of the Node
consumerLock = false; // release exclusivity
result = *value; // now copy it back
delete value; // and clean up
delete oldFirst; // both allocations
return true; // and report success
}
consumerLock = false; // release exclusivity
return false; // queue was empty
}
bool push( const T& t ) {
LFQNode* tmp = new LFQNode( t ); // do work off to the side
while( producerLock.set(true) )
{ } // acquire exclusivity
last->next = tmp; // A: publish the new item
last = tmp; // B: not "last->next"
producerLock = false; // release exclusivity
return true;
}
};
`
another question is how do you define CACHE_LINE_SIZE? its vary on ever CPUs right?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With