Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Enabling Queue<T> with concurrency

I have a previous question which I have provided my solution; however, I don't have access to ConcurrentQueue<T> since I am on .Net 3.5. I need Queue<T> to allow concurrency. I read this question and seems to present a problem if an item is not in the queue and the threaded method tries to dequeue an item.

My task now is to determine whether I can derive my own concurrent Queue class. This is what I came up with:

public sealed class ConcurrentQueue : Queue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;
    private ICollection que;

    new public void Enqueue(DataTable Table)
    {
        lock (que.SyncRoot)
        {
            base.Enqueue(Table);
        }

        OnTableQueued(new TableQueuedEventArgs(Dequeue()));
    }

    //  this is where I think I will have a problem...
    new public DataTable Dequeue()
    {
        DataTable table;

        lock (que.SyncRoot)
        {
            table = base.Dequeue();
        }

        return table;
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

So, when a DataTable is queued, the EventArgs will pass a dequeued table to the event subscriber. Will this implementation provide me with a thread-safe Queue?

like image 522
IAbstract Avatar asked Dec 29 '10 15:12

IAbstract


2 Answers

A quick trip to my favorite search engine revealed that my memory was correct; you can get the Task Parallel Library even on .NET 3.5. Also see The PFX team blog post on the subject, and the Reactive Extensions that you download in order to get at the desired System.Threading.dll.

like image 109
Domenic Avatar answered Oct 12 '22 21:10

Domenic


The fact you need to use new to hide methods from the base class is usually an indication that you should use composition rather than inheritance...

Here's a simple synchronized queue, which doesn't use inheritance but still relies on the behavior of the standard Queue<T>:

public class ConcurrentQueue<T> : ICollection, IEnumerable<T>
{
    private readonly Queue<T> _queue;

    public ConcurrentQueue()
    {
        _queue = new Queue<T>();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (SyncRoot)
        {
            foreach (var item in _queue)
            {
                yield return item;
            }
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        lock (SyncRoot)
        {
            ((ICollection)_queue).CopyTo(array, index);
        }
    }

    public int Count
    {
        get
        { 
            // Assumed to be atomic, so locking is unnecessary
            return _queue.Count;
        }
    }

    public object SyncRoot
    {
        get { return ((ICollection)_queue).SyncRoot; }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public void Enqueue(T item)
    {
        lock (SyncRoot)
        {
            _queue.Enqueue(item);
        }
    }

    public T Dequeue()
    {
        lock(SyncRoot)
        {
            return _queue.Dequeue();
        }
    }

    public T Peek()
    {
        lock (SyncRoot)
        {
            return _queue.Peek();
        }
    }

    public void Clear()
    {
        lock (SyncRoot)
        {
            _queue.Clear();
        }
    }
}
like image 42
Thomas Levesque Avatar answered Oct 12 '22 21:10

Thomas Levesque