Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent Priority Queue in .NET 4.0

People also ask

What is concurrent queue in C#?

C# ConcurrentQueue is a thread-safe collection class. It is introduced in .NET 4.0 with other concurrent collection classes. It provides a thread-safe First-In-First-Out (FIFO) data structure. You can read more about Queue here. ConcurrentQueue exists in System.Collections.Concurrent namespace.

Is priority queue thread-safe C#?

No. Keep your eye on the big picture, the point of PriorityQueue is iteration. You get the elements back in priority order. Iteration is never thread-safe.

Does C# have a priority queue?

Priority Queue in C# is a very useful data structure that has so many real-world applications.

Is concurrent queue thread-safe?

Note that both ConcurrentStack and ConcurrentQueue classes are thread safe and they can manage locking and synchronization issues internally. You can also convert the concurrent queue instance to an array by making a call to the ToArray() method.


There is an implementation as part of "Samples for Parallel Programming with the .NET Framework" at msdn. See ParallelExtensionsExtras.

Direct link to source code for file ConcurrentPriorityQueue.cs


You may need to roll your own. A relatively easy way would be to have an array of regular queues, with priority decreasing.

Basically, you would insert into the queue for the appropriate priority. Then, on the consumer side, you would go down the list, from highest to lowest priority, checking to see if the queue is non-empty, and consuming an entry if so.


Maybe you can use my own implementation of a PriorityQueue. It implements alot more than the usual push/pop/peek, features that I implemented whenever I found the need for it. It also has locks for concurrency.

Comments to the code is much appreciated :)

public class PriorityQueue<T> where T : class
{
    private readonly object lockObject = new object();
    private readonly SortedList<int, Queue<T>> list = new SortedList<int, Queue<T>>();

    public int Count
    {
        get
        {
            lock (this.lockObject)
            {
                return list.Sum(keyValuePair => keyValuePair.Value.Count);
            }
        }
    }

    public void Push(int priority, T item)
    {
        lock (this.lockObject)
        {
            if (!this.list.ContainsKey(priority))
                this.list.Add(priority, new Queue<T>());
            this.list[priority].Enqueue(item);
        }
    }
    public T Pop()
    {
        lock (this.lockObject)
        {
            if (this.list.Count > 0)
            {
                T obj = this.list.First().Value.Dequeue();
                if (this.list.First().Value.Count == 0)
                    this.list.Remove(this.list.First().Key);
                return obj;
            }
        }
        return null;
    }
    public T PopPriority(int priority)
    {
        lock (this.lockObject)
        {
            if (this.list.ContainsKey(priority))
            {
                T obj = this.list[priority].Dequeue();
                if (this.list[priority].Count == 0)
                    this.list.Remove(priority);
                return obj;
            }
        }
        return null;
    }
    public IEnumerable<T> PopAllPriority(int priority)
    {
        List<T> ret = new List<T>();
        lock(this.lockObject)
        {
            if (this.list.ContainsKey(priority))
            {
                while(this.list.ContainsKey(priority) && this.list[priority].Count > 0)
                    ret.Add(PopPriority(priority));
                return ret;
            }
        }
        return ret;
    }
    public T Peek()
    {
        lock (this.lockObject)
        {
            if (this.list.Count > 0)
                return this.list.First().Value.Peek();
        }
        return null;
    }
    public IEnumerable<T> PeekAll()
    {
        List<T> ret = new List<T>();
        lock (this.lockObject)
        {
            foreach (KeyValuePair<int, Queue<T>> keyValuePair in list)
                ret.AddRange(keyValuePair.Value.AsEnumerable());
        }
        return ret;
    }
    public IEnumerable<T> PopAll()
    {
        List<T> ret = new List<T>();
        lock (this.lockObject)
        {
            while (this.list.Count > 0)
                ret.Add(Pop());
        }
        return ret;
    }
}

Well, 7 years passed, but for posterity, I would like to answer with my implementation.

Documentation: Optionally awaitable simple to use Concurrent Priority Queue

Sourcecodes: github

nuget package

  • Lock-Free,
  • Highly Concurrent,
  • generic in stored item type,
  • generic in priority type, but constrained to priorities represented by .net enum, strongly typed priority,
  • explicitly defined descending order of priorities during construction,
  • ability to detect items count and per priority level items count,
  • ability to dequeue - descending order of priorities,
  • ability to override dequeue priority level,
  • potentially awaitable,
  • potentially priority based awaitable,