I'm trying to write a subj queue, but I get deadlocks and other multithreading problems. I want to use Interlocked.CompareExchange to avoid lock usage. But this code doesn't work as expected: it just wipe entire Queue. What am I doing wrong here?
public class FixedSizedQueue<T> : IEnumerable<T>
{
readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
public int Limit { get; set; }
public FixedSizedQueue(int limit)
{
Limit = limit;
}
public void Enqueue(T obj)
{
_queue.Enqueue(obj);
if (_queue.Count <= Limit)
return;
int count = _queue.Count;
if (_queue.Count != Interlocked.CompareExchange(ref count, count, _queue.Count))
{
T overflow;
while (_queue.TryDequeue(out overflow))
{
}
}
}
public T[] ToArray()
{
return _queue.ToArray();
}
public IEnumerator<T> GetEnumerator()
{
return _queue.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
Maybe I just need another thread that will just cut the queue...
Interlocked.CompareExchange is meaningless on stack variable count, as it is accessed from single thread. As I guess, you tried to use this method on _queue.Count, but it failed to be compiled because .Count is a property, not a simple variable. So you need to define counter in your class.
public class FixedSizedQueue<T> : IEnumerable<T>
{
readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
int CountShadow = 0; // Counter for check constraints.
public int Limit { get; set; }
public FixedSizedQueue(int limit)
{
Limit = limit;
}
public void Enqueue(T obj)
{
/* Update shadow counter first for check constraints. */
int count = CountShadow;
while(true)
{
if(count => Limit) return; // Adding element would violate constraint
int countOld = Interlocked.CompareExchange(ref CountShadow, count + 1, count);
if(countOld == count) break; //Successful update
count = countOld;
}
_queue.Enqueue(obj); // This will update real counter.
}
...
}
Also, you need to set your own setter for Limit property, which would maintain invariant CountShadow <= Limit. Or just forbid user to set that property after object's construction.
Tsyvarev's approach is clever and valid, but there is also another way to limit the size of the queue with the Interlocked class. Instead of spinning with the Interlocked.CompareExchange until the current thread wins the optimistic race, it is also possible to just increment the CountShadow field, and then immediately decrement it in case the maximum limit has been exceeded. Here is an implementation of this idea:
public class ConcurrentBoundedQueue<T> : IEnumerable<T>
{
private readonly ConcurrentQueue<T> _queue;
private readonly int _boundedCapacity;
private volatile int _approximateCount;
public ConcurrentBoundedQueue(int boundedCapacity)
{
if (boundedCapacity < 1)
throw new ArgumentOutOfRangeException(nameof(boundedCapacity));
_queue = new();
_boundedCapacity = boundedCapacity;
_approximateCount = 0;
}
public int BoundedCapacity => _boundedCapacity;
public int Count => _queue.Count;
public bool TryEnqueue(T item)
{
if (_approximateCount >= _boundedCapacity) return false;
if (Interlocked.Increment(ref _approximateCount) > _boundedCapacity)
{
Interlocked.Decrement(ref _approximateCount);
return false;
}
_queue.Enqueue(item);
return true;
}
public bool TryDequeue(out T item)
{
bool success = _queue.TryDequeue(out item);
if (success) Interlocked.Decrement(ref _approximateCount);
return success;
}
public T[] ToArray() => _queue.ToArray();
public IEnumerator<T> GetEnumerator() => _queue.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
According to my experiments this approach is slightly faster than Tsyvarev's approach, under conditions of intense activity and contention. Functionally the two approaches seem to be identical. They both enforce the boundedCapacity policy, and I can't see any difference in the way they behave. They are also both faster than wrapping a normal Queue<T>, and protecting it with the lock statement.
It should be noted that the functionality offered by the ConcurrentBoundedQueue<T> class is also offered out of the box by the built-in BlockingCollection<T>. The TryEnqueue method corresponds to the TryAdd, and the TryDequeue to the TryTake. These APIs use internally Interlocked operations, in a similar way with Tsyvarev's solution. According to my experiments using the BlockingCollection<T> for this purpose has considerable overhead, that makes it even slower than a simple lock-protected Queue<T>.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With