Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a blocking Queue<T> in .NET?

People also ask

How is blocking queue implemented?

Put() Implementation in Blocking Queue This implementation is very similar to enQueue() method. Once the capacity is reached, the thread is blocked or else it's a simple enQueue operation using LinkedList. Once the element is queued, we notify in case other waiting threads are blocked due to an empty queue.

What is blocking collection in C#?

What is a BlockingCollection? The BlockingCollection is a thread-safe collection in which you can have multiple threads add and remove data concurrently. It is represented in . Net through the BlockingCollection class; you can use this class to implement a producer-consumer pattern.

What are blocking queues used for?

BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.

Is queue get blocking?

Yep, queue. get() will block only a thread where it was called. The term block could be a bit misleading here, or at least merits more depth. The line of code will wait alright, but it will not be blocking in the sense of hogging the processor (or any processor) nor will it hold the GIL I think.


That looks very unsafe (very little synchronization); how about something like:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(edit)

In reality, you'd want a way to close the queue so that readers start exiting cleanly - perhaps something like a bool flag - if set, an empty queue just returns (rather than blocking):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}

Use .net 4 BlockingCollection, to enqueue use Add(), to dequeue use Take(). It internally uses non-blocking ConcurrentQueue. More info here Fast and Best Producer/consumer queue technique BlockingCollection vs concurrent Queue


"How can this be improved?"

Well, you need to look at every method in your class and consider what would happen if another thread was simultaneously calling that method or any other method. For example, you put a lock in the Remove method, but not in the Add method. What happens if one thread Adds at the same time as another thread Removes? Bad things.

Also consider that a method can return a second object that provides access to the first object's internal data - for example, GetEnumerator. Imagine one thread is going through that enumerator, another thread is modifying the list at the same time. Not good.

A good rule of thumb is to make this simpler to get right by cutting down the number of methods in the class to the absolute minimum.

In particular, don't inherit another container class, because you will expose all of that class's methods, providing a way for the caller to corrupt the internal data, or to see partially complete changes to the data (just as bad, because the data appears corrupted at that moment). Hide all the details and be completely ruthless about how you allow access to them.

I'd strongly advise you to use off-the-shelf solutions - get a book about threading or use 3rd party library. Otherwise, given what you're attempting, you're going to be debugging your code for a long time.

Also, wouldn't it make more sense for Remove to return an item (say, the one that was added first, as it's a queue), rather than the caller choosing a specific item? And when the queue is empty, perhaps Remove should also block.

Update: Marc's answer actually implements all these suggestions! :) But I'll leave this here as it may be helpful to understand why his version is such an improvement.


You can use the BlockingCollection and ConcurrentQueue in the System.Collections.Concurrent Namespace

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}

I just knocked this up using the Reactive Extensions and remembered this question:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

Not necessarily entirely safe, but very simple.


This is what I came op for a thread safe bounded blocking queue.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}

I haven't fully explored the TPL but they might have something that fits your needs, or at the very least, some Reflector fodder to snag some inspiration from.

Hope that helps.