Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread Safety in Concurrent Queue C#

I have a MessagesManager thread to which different threads may send messages and then this MessagesManager thread is responsible to publish these messages inside SendMessageToTcpIP() (start point of MessagesManager thread ).

class MessagesManager : IMessageNotifier
{
    //private
    private readonly AutoResetEvent _waitTillMessageQueueEmptyARE = new AutoResetEvent(false);
    private ConcurrentQueue<string> MessagesQueue = new ConcurrentQueue<string>(); 

    public void PublishMessage(string Message)
    {
        MessagesQueue.Enqueue(Message);
        _waitTillMessageQueueEmptyARE.Set();
    }

    public void SendMessageToTcpIP()
    {
        //keep waiting till a new message comes
        while (MessagesQueue.Count() == 0)
        {
            _waitTillMessageQueueEmptyARE.WaitOne();
        }

        //Copy the Concurrent Queue into a local queue - keep dequeuing the item once it is inserts into the local Queue
        Queue<string> localMessagesQueue = new Queue<string>();

        while (!MessagesQueue.IsEmpty)
        {
            string message;
            bool isRemoved = MessagesQueue.TryDequeue(out message);
            if (isRemoved)
                localMessagesQueue.Enqueue(message);
        }

        //Use the Local Queue for further processing
        while (localMessagesQueue.Count() != 0)
        {
            TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
            Thread.Sleep(2000);
        }
    }
}

The different threads (3-4) send their message by calling the PublishMessage(string Message) (using same object to MessageManager). Once the message comes, I push that message into a concurrent queue and notifies the SendMessageToTcpIP() by setting _waitTillMessageQueueEmptyARE.Set();. Inside SendMessageToTcpIP(), I am copying the message from the concurrent queue inside a local queue and then publish one by one.

QUESTIONS: Is it thread safe to do enqueuing and dequeuing in this way? Could there be some strange effects due to it?

like image 269
skm Avatar asked Mar 30 '17 12:03

skm


2 Answers

While this is probably thread-safe, there are built-in classes in .NET to help with "many publishers one consumer" pattern, like BlockingCollection. You can rewrite your class like this:

class MessagesManager : IDisposable {
    // note that your ConcurrentQueue is still in play, passed to constructor
    private readonly BlockingCollection<string> MessagesQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());

    public MessagesManager() {
        // start consumer thread here
        new Thread(SendLoop) {
            IsBackground = true
        }.Start();
    }

    public void PublishMessage(string Message) {
        // no need to notify here, will be done for you
        MessagesQueue.Add(Message);
    }

    private void SendLoop() {
        // this blocks until new items are available
        foreach (var item in MessagesQueue.GetConsumingEnumerable()) {
            // ensure that you handle exceptions here, or whole thing will break on exception
            TcpIpMessageSenderClient.ConnectAndSendMessage(item.PadRight(80, ' '));
            Thread.Sleep(2000); // only if you are sure this is required 
        }
    }

    public void Dispose() {            
        // this will "complete" GetConsumingEnumerable, so your thread will complete
        MessagesQueue.CompleteAdding();
        MessagesQueue.Dispose();
    }
}
like image 174
Evk Avatar answered Sep 24 '22 08:09

Evk


.NET already provides ActionBlock< T> that allows posting messages to a buffer and processing them asynchronously. By default, only one message is processed at a time.

Your code could be rewritten as:

//In an initialization function
ActionBlock<string> _hmiAgent=new ActionBlock<string>(async msg=>{
        TcpIpMessageSenderClient.ConnectAndSendMessage(msg.PadRight(80, ' '));
        await Task.Delay(2000);
);

//In some other thread ...
foreach ( ....)
{
    _hmiAgent.Post(someMessage);
}

// When the application closes

_hmiAgent.Complete();
await _hmiAgent.Completion;

ActionBlock offers many benefits - you can specify a limit to the number of items it can accept in a buffer and specify that multiple messages can be processed in parallel. You can also combine multiple blocks in a processing pipeline. In a desktop application, a message can be posted to a pipeline in response to an event, get processed by separate blocks and results posted to a final block that updates the UI.

Padding, for example, could be performed by an intermediary TransformBlock< TIn,TOut>. This transformation is trivial and the cost of using the block is greater than the method, but that's just an illustration:

//In an initialization function
TransformBlock<string> _hmiAgent=new TransformBlock<string,string>(
    msg=>msg.PadRight(80, ' '));

ActionBlock<string> _tcpBlock=new ActionBlock<string>(async msg=>{
        TcpIpMessageSenderClient.ConnectAndSendMessage());
        await Task.Delay(2000);
);

var linkOptions=new DataflowLinkOptions{PropagateCompletion = true};
_hmiAgent.LinkTo(_tcpBlock);

The posting code doesn't change at all

    _hmiAgent.Post(someMessage);

When the application terminates, we need to wait for the _tcpBlock to complete:

    _hmiAgent.Complete();
    await _tcpBlock.Completion;

Visual Studio 2015+ itself uses TPL Dataflow for such scenarios

Bar Arnon provides a better example in TPL Dataflow Is The Best Library You're Not Using, that shows how both synchronous and asynchronous methods can be used in a block.

like image 36
Panagiotis Kanavos Avatar answered Sep 20 '22 08:09

Panagiotis Kanavos