I have a producer/consumer queue as following but I am getting ArgumentWException
.
Following is the code:
public class ProducerConsumer<T> where T : class
{
#region Private Variables
private Thread _workerThread;
private readonly Queue<T> _workQueue;
private object _enqueueItemLocker = new object();
private object _processRecordLocker = new object();
private readonly Action<T> _workCallbackAction;
private AutoResetEvent _workerWaitSignal;
#endregion
#region Constructor
public ProducerConsumer(Action<T> action)
{
_workQueue = new Queue<T>();
_workCallbackAction = action;
}
#endregion
#region Private Methods
private void ProcessRecord()
{
while (true)
{
T workItemToBeProcessed = default(T);
bool hasSomeWorkItem = false;
lock (_processRecordLocker)
{
hasSomeWorkItem = _workQueue.Count > 0;
if (hasSomeWorkItem)
{
workItemToBeProcessed = _workQueue.Dequeue();
if (workItemToBeProcessed == null)
{
return;
}
}
}
if (hasSomeWorkItem)
{
if (_workCallbackAction != null)
{
_workCallbackAction(workItemToBeProcessed);
}
}
else
{
_workerWaitSignal.WaitOne();
}
}
}
#endregion
#region Public Methods
/// <summary>
/// Enqueues work item in the queue.
/// </summary>
/// <param name="workItem">The work item.</param>
public void EnQueueWorkItem(T workItem)
{
lock (_enqueueItemLocker)
{
_workQueue.Enqueue(workItem);
if (_workerWaitSignal == null)
{
_workerWaitSignal = new AutoResetEvent(false);
}
_workerWaitSignal.Set();
}
}
/// <summary>
/// Stops the processer, releases wait handles.
/// </summary>
/// <param name="stopSignal">The stop signal.</param>
public void StopProcesser(AutoResetEvent stopSignal)
{
EnQueueWorkItem(null);
_workerThread.Join();
_workerWaitSignal.Close();
_workerWaitSignal = null;
if (stopSignal != null)
{
stopSignal.Set();
}
}
/// <summary>
/// Starts the processer, starts a new worker thread.
/// </summary>
public void StartProcesser()
{
if (_workerWaitSignal == null)
{
_workerWaitSignal = new AutoResetEvent(false);
}
_workerThread = new Thread(ProcessRecord) { IsBackground = true };
_workerThread.Start();
}
#endregion
}
Another class is:
public class Tester
{
private readonly ProducerConsumer<byte[]> _proConsumer;
public Tester()
{
_proConsumer = new ProducerConsumer<byte[]>(Display);
}
public void AddData(byte[] data)
{
try
{
_proConsumer.EnQueueWorkItem(recordData);
}
catch (NullReferenceException nre)
{
}
}
public void Start()
{
_proConsumer.StartProcesser();
}
private static object _recordLocker = new object();
private void Display(byte[] recordByteStream)
{
try
{
lock (_recordLocker)
{
Console.WriteLine("Done with data:" + BitConverter.ToInt32(recordByteStream, 0));
}
}
catch (Exception ex)
{
}
}
}
And my main function:
class Program
{
private static Tester _recorder;
static void Main(string[] args)
{
_recorder = new Tester();
_recorder.StartRecording();
for (int i = 0; i < 100000; i++)
{
_recorder.AddRecordData(BitConverter.GetBytes(i));
}
Console.Read();
}
}
Any idea why do I get the exception and what should I do to avoid that ?
Your class, in its current implementation, is not thread-safe. You're using two different objects for your Enqueue
(lock (_enqueueItemLocker)
) and Dequeue
(lock (_processRecordLocker)
) calls, which creates a race condition in your Queue<T>
.
You need to lock the same object instance on both calls in order to safely use the queue.
If you're using .NET 4, I'd recommend either using ConcurrentQueue<T>
or BlockingCollection<T>
instead, as these would eliminate the need for the locks in your code, since they're thread-safe.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With