We have implemented a message queue by using C# Queue
. We know we only have ONE consumer to take available message out from the queue for processing with a while
loop. We also know there is only ONE producer to put message onto the queue.
We have a lock
on above message queue to make sure the consumer and producer cannot access the queue simultaneously.
My question is is that lock
necessary? If the Queue
increase its Count
property AFTER an item is actually added and if the consumer check the Count
before retrieve, the consumer should get a complete message item even we don't have that lock
. Right? So we will not face a partial message item issue. Then we can get rid of that lock
?
That lock
will slow down the system and occasionally we can see the retrieve thread is blocked for a while because we have a very heavy producer.
EDIT:
Unfortunately we are using .Net 3.5.
The real problem is that internal data structures of the queue may be being mutated whilst an enqueue or dequeue is happening, and during this period the data structure is in an indeterminate state from the point-of-view of another thread.
For example, an enqueue may require internal data structures to be expanded, where a new structure is created, and old items are copied form an old structure to a new structure. There will be many steps involved in this process where at any time it would be dangerous for another thread to access the queue as the operations are not complete.
Therefore, during enqueue\dequeue you have to lock to make these operation appear to be logically atomic.
You may try the new ConcurrentQueue class in .Net 4.0 as this possibly has better performance characteristics as it uses a non-locking algorithm.
The lock is necessary, if you're using Queue<T>
. You could easily remove this by replacing that with ConcurrentQueue<T>
, however, you may want to consider simplifying this code by replacing it with a BlockingCollection<T>
.
This would allow your consumer to eliminate the lock and while checking, and just do a single foreach on collection.GetConsumingEnumerable()
. The producer can eliminate the lock and add items as needed. It also would easily allow you to scale up to using multiple producers, since you mentioned you have a "very heavy producer" at the moment.
See ConcurrentQueue.
No, this won't work consistently... why?
Let's disassemble the two methods we are going to be calling concurrently from two threads (one read & one writer):
public T Dequeue()
{
if (this._size == 0)
{
ThrowHelper.ThrowInvalidOperationException(ExceptionResource.InvalidOperation_EmptyQueue);
}
T local = this._array[this._head];
this._array[this._head] = default(T);
this._head = (this._head + 1) % this._array.Length;
this._size--;
this._version++;
return local;
}
public void Enqueue(T item)
{
if (this._size == this._array.Length)
{
int capacity = (int) ((this._array.Length * 200L) / 100L);
if (capacity < (this._array.Length + 4))
{
capacity = this._array.Length + 4;
}
this.SetCapacity(capacity);
}
this._array[this._tail] = item;
this._tail = (this._tail + 1) % this._array.Length;
this._size++;
this._version++;
}
Given the above code three of the variables are safe if (and only if) there is adequate capacity in the queue. the fields for _array, _head, and _tail are either unmodified or modified in only one of the two methods above.
The reason you cannot remove the lock() is that both methods modify _size and _version. Though arguably the collision on _version could be ignored, the collision on _size would cause some undesired and unpredictable behavior.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With