I have a MessagesManager
thread to which different threads may send messages and then this MessagesManager
thread is responsible to publish these messages inside SendMessageToTcpIP()
(start point of MessagesManager
thread ).
class MessagesManager : IMessageNotifier
{
//private
private readonly AutoResetEvent _waitTillMessageQueueEmptyARE = new AutoResetEvent(false);
private ConcurrentQueue<string> MessagesQueue = new ConcurrentQueue<string>();
public void PublishMessage(string Message)
{
MessagesQueue.Enqueue(Message);
_waitTillMessageQueueEmptyARE.Set();
}
public void SendMessageToTcpIP()
{
//keep waiting till a new message comes
while (MessagesQueue.Count() == 0)
{
_waitTillMessageQueueEmptyARE.WaitOne();
}
//Copy the Concurrent Queue into a local queue - keep dequeuing the item once it is inserts into the local Queue
Queue<string> localMessagesQueue = new Queue<string>();
while (!MessagesQueue.IsEmpty)
{
string message;
bool isRemoved = MessagesQueue.TryDequeue(out message);
if (isRemoved)
localMessagesQueue.Enqueue(message);
}
//Use the Local Queue for further processing
while (localMessagesQueue.Count() != 0)
{
TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
Thread.Sleep(2000);
}
}
}
The different threads (3-4) send their message by calling the PublishMessage(string Message)
(using same object to MessageManager). Once the message comes, I push that message into a concurrent queue and notifies the SendMessageToTcpIP()
by setting _waitTillMessageQueueEmptyARE.Set();
. Inside SendMessageToTcpIP()
, I am copying the message from the concurrent queue inside a local queue and then publish one by one.
QUESTIONS: Is it thread safe to do enqueuing and dequeuing in this way? Could there be some strange effects due to it?
While this is probably thread-safe, there are built-in classes in .NET to help with "many publishers one consumer" pattern, like BlockingCollection
. You can rewrite your class like this:
class MessagesManager : IDisposable {
// note that your ConcurrentQueue is still in play, passed to constructor
private readonly BlockingCollection<string> MessagesQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());
public MessagesManager() {
// start consumer thread here
new Thread(SendLoop) {
IsBackground = true
}.Start();
}
public void PublishMessage(string Message) {
// no need to notify here, will be done for you
MessagesQueue.Add(Message);
}
private void SendLoop() {
// this blocks until new items are available
foreach (var item in MessagesQueue.GetConsumingEnumerable()) {
// ensure that you handle exceptions here, or whole thing will break on exception
TcpIpMessageSenderClient.ConnectAndSendMessage(item.PadRight(80, ' '));
Thread.Sleep(2000); // only if you are sure this is required
}
}
public void Dispose() {
// this will "complete" GetConsumingEnumerable, so your thread will complete
MessagesQueue.CompleteAdding();
MessagesQueue.Dispose();
}
}
.NET already provides ActionBlock< T> that allows posting messages to a buffer and processing them asynchronously. By default, only one message is processed at a time.
Your code could be rewritten as:
//In an initialization function
ActionBlock<string> _hmiAgent=new ActionBlock<string>(async msg=>{
TcpIpMessageSenderClient.ConnectAndSendMessage(msg.PadRight(80, ' '));
await Task.Delay(2000);
);
//In some other thread ...
foreach ( ....)
{
_hmiAgent.Post(someMessage);
}
// When the application closes
_hmiAgent.Complete();
await _hmiAgent.Completion;
ActionBlock offers many benefits - you can specify a limit to the number of items it can accept in a buffer and specify that multiple messages can be processed in parallel. You can also combine multiple blocks in a processing pipeline. In a desktop application, a message can be posted to a pipeline in response to an event, get processed by separate blocks and results posted to a final block that updates the UI.
Padding, for example, could be performed by an intermediary TransformBlock< TIn,TOut>. This transformation is trivial and the cost of using the block is greater than the method, but that's just an illustration:
//In an initialization function
TransformBlock<string> _hmiAgent=new TransformBlock<string,string>(
msg=>msg.PadRight(80, ' '));
ActionBlock<string> _tcpBlock=new ActionBlock<string>(async msg=>{
TcpIpMessageSenderClient.ConnectAndSendMessage());
await Task.Delay(2000);
);
var linkOptions=new DataflowLinkOptions{PropagateCompletion = true};
_hmiAgent.LinkTo(_tcpBlock);
The posting code doesn't change at all
_hmiAgent.Post(someMessage);
When the application terminates, we need to wait for the _tcpBlock to complete:
_hmiAgent.Complete();
await _tcpBlock.Completion;
Visual Studio 2015+ itself uses TPL Dataflow for such scenarios
Bar Arnon provides a better example in TPL Dataflow Is The Best Library You're Not Using, that shows how both synchronous and asynchronous methods can be used in a block.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With