Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Producer/ Consumer pattern using threads and EventWaitHandle

I guess it is sort of a code review, but here is my implementation of the producer / consumer pattern. What I would like to know is would there be a case in which the while loops in the ReceivingThread() or SendingThread() methods might stop executing. Please note that EnqueueSend(DataSendEnqeueInfo info) is called from multiple different threads and I probably can't use tasks here since I definitely have to consume commands in a separate thread.

private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;

private void ReceivingThread()
{
    while (mIsRunning)
    {
        mRcWaitHandle.WaitOne();
        DataRecievedEnqeueInfo item = null;
        while (mReceivingThreadQueue.Count > 0)
        {
            lock (mReceivingQueueLock)
            {
                item = mReceivingThreadQueue.Dequeue();
            }
            ProcessReceivingItem(item);
        }
        mRcWaitHandle.Reset();
    }
}

private void SendingThread()
{
    while (mIsRunning)
    {
        mSeWaitHandle.WaitOne();
        while (mSendingThreadQueue.Count > 0)
        {
            DataSendEnqeueInfo item = null;
            lock (mSendingQueueLock)
            {
                item = mSendingThreadQueue.Dequeue();
            }
            ProcessSendingItem(item);
        }
        mSeWaitHandle.Reset();
    }
}

internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
    lock (mReceivingQueueLock)
    {
        mReceivingThreadQueue.Enqueue(info);
        mRcWaitHandle.Set();
    }
}

public void EnqueueSend(DataSendEnqeueInfo info)
{
     lock (mSendingQueueLock)
    {
        mSendingThreadQueue.Enqueue(info);
        mSeWaitHandle.Set();
    }
}

P.S the idea here is that am using WaitHandles to put thread to sleep when the queue is empty, and signal them to start when new items are enqueued.

UPDATE I am just going to leave this https://blogs.msdn.microsoft.com/benwilli/2015/09/10/tasks-are-still-not-threads-and-async-is-not-parallel/ ,for people who might be trying to implement Producer/Consumer pattern using TPL or tasks.

like image 797
Sushant Poojary Avatar asked Dec 24 '22 20:12

Sushant Poojary


1 Answers

Use a BlockingCollection instead of Queue, EventWaitHandle and lock objects:

public class DataInfo { }

private Thread mReceivingThread;
private Thread mSendingThread;

private BlockingCollection<DataInfo> queue;

private CancellationTokenSource receivingCts = new CancellationTokenSource();

private void ReceivingThread()
{
    try
    {
        while (!receivingCts.IsCancellationRequested)
        {
            // This will block until an item is added to the queue or the cancellation token is cancelled
            DataInfo item = queue.Take(receivingCts.Token);

            ProcessReceivingItem(item);
        }
    }
    catch (OperationCanceledException)
    {

    }
}

internal void EnqueueRecevingData(DataInfo info)
{
    // When a new item is produced, just add it to the queue
    queue.Add(info);
}

// To cancel the receiving thread, cancel the token
private void CancelReceivingThread()
{
    receivingCts.Cancel();
}
like image 70
Serge Avatar answered Dec 28 '22 05:12

Serge