Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ArgumentException in producer-consumer queue in C#

enter image description hereI have a producer/consumer queue as following but I am getting ArgumentWException.

Following is the code:

public class ProducerConsumer<T> where T : class
    {
        #region Private Variables
        private Thread _workerThread;
        private readonly Queue<T> _workQueue;
        private  object _enqueueItemLocker = new object();
        private  object _processRecordLocker = new object();
        private readonly Action<T> _workCallbackAction;
        private AutoResetEvent _workerWaitSignal;
        #endregion

        #region Constructor
        public ProducerConsumer(Action<T> action)
        {
            _workQueue = new Queue<T>();
            _workCallbackAction = action;

        }
        #endregion
        #region Private Methods 
        private void ProcessRecord()
        {
            while (true)
            {               
                T workItemToBeProcessed = default(T);
                bool hasSomeWorkItem = false;
                lock (_processRecordLocker)
                {
                    hasSomeWorkItem = _workQueue.Count > 0;

                    if (hasSomeWorkItem)
                    {
                        workItemToBeProcessed = _workQueue.Dequeue();
                        if (workItemToBeProcessed == null)
                        {
                            return;
                        }
                    }
                }
                if (hasSomeWorkItem)
                {
                    if (_workCallbackAction != null)
                    {
                        _workCallbackAction(workItemToBeProcessed);
                    }
                }
                else
                {
                    _workerWaitSignal.WaitOne();
                }
            }
        }
        #endregion

        #region Public Methods
        /// <summary>
        /// Enqueues work item in the queue.
        /// </summary>
        /// <param name="workItem">The work item.</param>
        public void EnQueueWorkItem(T workItem)
        {
            lock (_enqueueItemLocker)
            {               
                _workQueue.Enqueue(workItem);

                if (_workerWaitSignal == null)
                {
                    _workerWaitSignal = new AutoResetEvent(false);
                }

                _workerWaitSignal.Set();
            }
        }
        /// <summary>
        /// Stops the processer, releases wait handles.
        /// </summary>
        /// <param name="stopSignal">The stop signal.</param>
        public void StopProcesser(AutoResetEvent stopSignal)
        {
            EnQueueWorkItem(null);

            _workerThread.Join();
            _workerWaitSignal.Close();
            _workerWaitSignal = null;

            if (stopSignal != null)
            {
                stopSignal.Set();
            }
        }
        /// <summary>
        /// Starts the processer, starts a new worker thread.
        /// </summary>
        public void StartProcesser()
        {
            if (_workerWaitSignal == null)
            {
                _workerWaitSignal = new AutoResetEvent(false);
            }
            _workerThread = new Thread(ProcessRecord) { IsBackground = true };
            _workerThread.Start();
        }
        #endregion
    }

Another class is:

public class Tester
{
    private readonly ProducerConsumer<byte[]> _proConsumer;
    public Tester()
    {
        _proConsumer = new ProducerConsumer<byte[]>(Display);
    }
    public void AddData(byte[] data)
    {
        try
        {
            _proConsumer.EnQueueWorkItem(recordData);
        }
        catch (NullReferenceException nre)
        {

        }
    }
    public void Start()
    {
        _proConsumer.StartProcesser();
    }

    private static object _recordLocker = new object();

    private void Display(byte[] recordByteStream)
    {
        try
        {
            lock (_recordLocker)
            {
                Console.WriteLine("Done with data:" + BitConverter.ToInt32(recordByteStream, 0));

            }

        }
        catch (Exception ex)
        {

        }

    }
}

And my main function:

class Program
    {
        private static Tester _recorder;
        static void Main(string[] args)
        {
            _recorder = new Tester();
            _recorder.StartRecording();

            for (int i = 0; i < 100000; i++)
            {
                _recorder.AddRecordData(BitConverter.GetBytes(i));              
            }

            Console.Read();
        }
    }

Any idea why do I get the exception and what should I do to avoid that ?

like image 316
Embedd_0913 Avatar asked Feb 18 '23 05:02

Embedd_0913


1 Answers

Your class, in its current implementation, is not thread-safe. You're using two different objects for your Enqueue (lock (_enqueueItemLocker)) and Dequeue (lock (_processRecordLocker)) calls, which creates a race condition in your Queue<T>.

You need to lock the same object instance on both calls in order to safely use the queue.

If you're using .NET 4, I'd recommend either using ConcurrentQueue<T> or BlockingCollection<T> instead, as these would eliminate the need for the locks in your code, since they're thread-safe.

like image 192
Reed Copsey Avatar answered Feb 21 '23 02:02

Reed Copsey