Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is this lock-free .NET queue thread safe?

My question is, is the class included below for a single-reader single-writer queue class thread-safe? This kind of queue is called lock-free, even if it will block if the queue is filled. The data structure was inspired by Marc Gravell's implementation of a blocking queue here at StackOverflow.

The point of the structure is to allow a single thread to write data to the buffer, and another thread to read data. All of this needs to happen as quickly as possible.

A similar data structure is described in an article at DDJ by Herb Sutter, except the implementation is in C++. Another difference is that I use a vanilla linked list, I use a linked list of arrays.

Rather than just including a snippet of code I include the whole thing with comment with a permissive open source license (MIT License 1.0) in case anyone finds it useful, and wants to use it (as-is or modified).

This is related to other questions asked on Stack Overflow of how to create a blocking concurrent queues (see Creating a blockinq Queue in .NET and Thread-safe blocking queue implementation in .NET).

Here is the code:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;

namespace CollectionSandbox
{
    /// This is a single reader / singler writer buffered queue implemented
    /// with (almost) no locks. This implementation will block only if filled 
    /// up. The implementation is a linked-list of arrays.
    /// It was inspired by the desire to create a non-blocking version 
    /// of the blocking queue implementation in C# by Marc Gravell
    /// https://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530228
    class SimpleSharedQueue<T> : IStreamBuffer<T>
    {
        /// Used to signal things are no longer full
        ManualResetEvent canWrite = new ManualResetEvent(true);

        /// This is the size of a buffer 
        const int BUFFER_SIZE = 512;

        /// This is the maximum number of nodes. 
        const int MAX_NODE_COUNT = 100;

        /// This marks the location to write new data to.
        Cursor adder;

        /// This marks the location to read new data from.
        Cursor remover;

        /// Indicates that no more data is going to be written to the node.
        public bool completed = false;

        /// A node is an array of data items, a pointer to the next item,
        /// and in index of the number of occupied items 
        class Node
        {
            /// Where the data is stored.
            public T[] data = new T[BUFFER_SIZE];

            /// The number of data items currently stored in the node.
            public Node next;

            /// The number of data items currently stored in the node.
            public int count;

            /// Default constructor, only used for first node.
            public Node()
            {
                count = 0;
            }

            /// Only ever called by the writer to add new Nodes to the scene
            public Node(T x, Node prev)
            {
                data[0] = x;
                count = 1;

                // The previous node has to be safely updated to point to this node.
                // A reader could looking at the point, while we set it, so this should be 
                // atomic.
                Interlocked.Exchange(ref prev.next, this);
            }
        }

        /// This is used to point to a location within a single node, and can perform 
        /// reads or writers. One cursor will only ever read, and another cursor will only
        /// ever write.
        class Cursor
        {
            /// Points to the parent Queue
            public SimpleSharedQueue<T> q;

            /// The current node
            public Node node;

            /// For a writer, this points to the position that the next item will be written to.
            /// For a reader, this points to the position that the next item will be read from.
            public int current = 0;

            /// Creates a new cursor, pointing to the node
            public Cursor(SimpleSharedQueue<T> q, Node node)
            {
                this.q = q;
                this.node = node;
            }

            /// Used to push more data onto the queue
            public void Write(T x)
            {
                Trace.Assert(current == node.count);

                // Check whether we are at the node limit, and are going to need to allocate a new buffer.
                if (current == BUFFER_SIZE)
                {
                    // Check if the queue is full
                    if (q.IsFull())
                    {
                        // Signal the canWrite event to false
                        q.canWrite.Reset();

                        // Wait until the canWrite event is signaled 
                        q.canWrite.WaitOne();
                    }

                    // create a new node
                    node = new Node(x, node);
                    current = 1;
                }
                else
                {
                    // If the implementation is correct then the reader will never try to access this 
                    // array location while we set it. This is because of the invariant that 
                    // if reader and writer are at the same node: 
                    //    reader.current < node.count 
                    // and 
                    //    writer.current = node.count 
                    node.data[current++] = x;

                    // We have to use interlocked, to assure that we incremeent the count 
                    // atomicalluy, because the reader could be reading it.
                    Interlocked.Increment(ref node.count);
                }
            }

            /// Pulls data from the queue, returns false only if 
            /// there 
            public bool Read(ref T x)
            {
                while (true)
                {
                    if (current < node.count)
                    {
                        x = node.data[current++];
                        return true;
                    }
                    else if ((current == BUFFER_SIZE) && (node.next != null))
                    {
                        // Move the current node to the next one.
                        // We know it is safe to do so.
                        // The old node will have no more references to it it 
                        // and will be deleted by the garbage collector.
                        node = node.next;

                        // If there is a writer thread waiting on the Queue,
                        // then release it.
                        // Conceptually there is a "if (q.IsFull)", but we can't place it 
                        // because that would lead to a Race condition.
                        q.canWrite.Set();

                        // point to the first spot                
                        current = 0;

                        // One of the invariants is that every node created after the first,
                        // will have at least one item. So the following call is safe
                        x = node.data[current++];
                        return true;
                    }

                    // If we get here, we have read the most recently added data.
                    // We then check to see if the writer has finished producing data.
                    if (q.completed)
                        return false;

                    // If we get here there is no data waiting, and no flagging of the completed thread.
                    // Wait a millisecond. The system will also context switch. 
                    // This will allow the writing thread some additional resources to pump out 
                    // more data (especially if it iself is multithreaded)
                    Thread.Sleep(1);
                }
            }
        }

        /// Returns the number of nodes currently used.
        private int NodeCount
        {
            get
            {
                int result = 0;
                Node cur = null;
                Interlocked.Exchange<Node>(ref cur, remover.node);

                // Counts all nodes from the remover to the adder
                // Not efficient, but this is not called often. 
                while (cur != null)
                {
                    ++result;
                    Interlocked.Exchange<Node>(ref cur, cur.next);
                }
                return result;
            }
        }

        /// Construct the queue.
        public SimpleSharedQueue()
        {
            Node root = new Node();
            adder = new Cursor(this, root);
            remover = new Cursor(this, root);
        }

        /// Indicate to the reader that no more data is going to be written.
        public void MarkCompleted()
        {
            completed = true;
        }

        /// Read the next piece of data. Returns false if there is no more data. 
        public bool Read(ref T x)
        {
            return remover.Read(ref x);
        }

        /// Writes more data.
        public void Write(T x)
        {
            adder.Write(x);
        }

        /// Tells us if there are too many nodes, and can't add anymore.
        private bool IsFull()
        {
            return NodeCount == MAX_NODE_COUNT;  
        }
    }
}
like image 678
cdiggins Avatar asked Oct 09 '09 02:10

cdiggins


People also ask

Is STD::queue empty() thread safe?

Join Bytes to post your question to a community of 470,982 software developers and data experts. std::queue empty () is thread safe? .... instead of... .... .... actually isn't. Consider the situation where queue isn't empty on "if" but gets empty on "lock".

What is thread safe collection in net?

Thread-Safe Collections. The .NET Framework 4 introduces the System.Collections.Concurrent namespace, which includes several collection classes that are both thread-safe and scalable. Multiple threads can safely and efficiently add or remove items from these collections, without requiring additional synchronization in user code.

Is there a lock-free queue in Java for personal learning?

I was trying to create a lock-free queue implementation in Java, mainly for personal learning. The queue should be a general one, allowing any number of readers and/or writers concurrently.

When does the two-thread rule not apply to concurrentqueue?

When processing time is around 500 FLOPS (floating point operations) or more, then the two-thread rule does not apply to ConcurrentQueue<T>, which then has very good scalability. Queue<T> does not scale well in this scenario.


5 Answers

Microsoft Research CHESS should prove to be a good tool for testing your implementation.

like image 196
GregC Avatar answered Oct 21 '22 09:10

GregC


The presence of Sleep() makes a lock-free approach totally useless. The only reason to confront the complexities of a lock-free design is the need for absolute speed and to avoid the cost of Semaphores. The use of Sleep(1) defeats that purpose totally.

like image 25
Henk Holterman Avatar answered Oct 21 '22 08:10

Henk Holterman


Given that I can't find any reference that the Interlocked.Exchange does Read or Write blocks, I would say not. I would also question why you want to go lockless, as seldom gives enough benefits to counter it's complexity.

Microsoft had an excellent presentation at the 2009 GDC on this, and you can get the slides here.

like image 25
Languard Avatar answered Oct 21 '22 08:10

Languard


Beware of the double checked - single lock pattern (as in a link quoted above: http://www.yoda.arachsys.com/csharp/singleton.html)

Quoting verbatim from the "Modern C++ Design" by Andrei Alexandrescu

    Very experienced multithreaded programmers know that even the Double-Checked Locking pattern, although correct on paper, is not always correct in practice. In certain symmetric multiprocessor environments (the ones featuring the so-called relaxed memory model), the writes are committed to the main memory in bursts, rather than one by one. The bursts occur in increasing order of addresses, not in chronological order. Due to this rearranging of writes, the memory as seen by one processor at a time might look as if the operations are not performed in the correct order by another processor. Concretely, the assignment to pInstance_ performed by a processor might occur before the Singleton object has been fully initialized! Thus, sadly, the Double-Checked Locking pattern is known to be defective for such systems
like image 33
0xdky Avatar answered Oct 21 '22 08:10

0xdky


I suspect it is not thread safe - imagine the following scenario:

two threads enter cursor.Write. The first gets as far as line node = new Node(x, node); in the true half of the if (current == BUFFER_SIZE) statement (but let's also assume that current == BUFFER_SIZE) so when 1 gets added to current then another thread coming in would follow the other path through the if statement. Now imagine that thread 1 loses its time slice and thread 2 gets it, and proceeds to enter the if statement on the mistaken belief that the condition still held. It should have entered the other path.

I haven't run this code either, so I'm not sure if my assumptions are possible in this code, but if they are (i.e. entering cursor.Write from multiple threads when current == BUFFER_SIZE), then it may well be prone to concurrency errors.

like image 39
Andrew Matthews Avatar answered Oct 21 '22 07:10

Andrew Matthews