C# ConcurrentQueue is a thread-safe collection class. It is introduced in .NET 4.0 with other concurrent collection classes. It provides a thread-safe First-In-First-Out (FIFO) data structure. You can read more about Queue here. ConcurrentQueue exists in System.Collections.Concurrent namespace.
No. Keep your eye on the big picture, the point of PriorityQueue is iteration. You get the elements back in priority order. Iteration is never thread-safe.
Priority Queue in C# is a very useful data structure that has so many real-world applications.
Note that both ConcurrentStack and ConcurrentQueue classes are thread safe and they can manage locking and synchronization issues internally. You can also convert the concurrent queue instance to an array by making a call to the ToArray() method.
There is an implementation as part of "Samples for Parallel Programming with the .NET Framework" at msdn. See ParallelExtensionsExtras.
Direct link to source code for file ConcurrentPriorityQueue.cs
You may need to roll your own. A relatively easy way would be to have an array of regular queues, with priority decreasing.
Basically, you would insert into the queue for the appropriate priority. Then, on the consumer side, you would go down the list, from highest to lowest priority, checking to see if the queue is non-empty, and consuming an entry if so.
Maybe you can use my own implementation of a PriorityQueue. It implements alot more than the usual push/pop/peek, features that I implemented whenever I found the need for it. It also has locks for concurrency.
Comments to the code is much appreciated :)
public class PriorityQueue<T> where T : class
{
private readonly object lockObject = new object();
private readonly SortedList<int, Queue<T>> list = new SortedList<int, Queue<T>>();
public int Count
{
get
{
lock (this.lockObject)
{
return list.Sum(keyValuePair => keyValuePair.Value.Count);
}
}
}
public void Push(int priority, T item)
{
lock (this.lockObject)
{
if (!this.list.ContainsKey(priority))
this.list.Add(priority, new Queue<T>());
this.list[priority].Enqueue(item);
}
}
public T Pop()
{
lock (this.lockObject)
{
if (this.list.Count > 0)
{
T obj = this.list.First().Value.Dequeue();
if (this.list.First().Value.Count == 0)
this.list.Remove(this.list.First().Key);
return obj;
}
}
return null;
}
public T PopPriority(int priority)
{
lock (this.lockObject)
{
if (this.list.ContainsKey(priority))
{
T obj = this.list[priority].Dequeue();
if (this.list[priority].Count == 0)
this.list.Remove(priority);
return obj;
}
}
return null;
}
public IEnumerable<T> PopAllPriority(int priority)
{
List<T> ret = new List<T>();
lock(this.lockObject)
{
if (this.list.ContainsKey(priority))
{
while(this.list.ContainsKey(priority) && this.list[priority].Count > 0)
ret.Add(PopPriority(priority));
return ret;
}
}
return ret;
}
public T Peek()
{
lock (this.lockObject)
{
if (this.list.Count > 0)
return this.list.First().Value.Peek();
}
return null;
}
public IEnumerable<T> PeekAll()
{
List<T> ret = new List<T>();
lock (this.lockObject)
{
foreach (KeyValuePair<int, Queue<T>> keyValuePair in list)
ret.AddRange(keyValuePair.Value.AsEnumerable());
}
return ret;
}
public IEnumerable<T> PopAll()
{
List<T> ret = new List<T>();
lock (this.lockObject)
{
while (this.list.Count > 0)
ret.Add(Pop());
}
return ret;
}
}
Well, 7 years passed, but for posterity, I would like to answer with my implementation.
Documentation: Optionally awaitable simple to use Concurrent Priority Queue
Sourcecodes: github
nuget package
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With