I'm implementing a data link layer using a producer/consumer pattern. The data link layer has its own thread and state machine to communicate the data link protocol over the wire (Ethernet, RS-232...). The interface to the physical layer is represented as a System.IO.Stream. Another thread writes messages to and reads messages from the data link object.
The data link object has an idle state that must wait for one of four conditions:
I'm having a difficult time figuring out the best way to do this without splitting up communication into a read/write thread (thereby significantly increasing the complexity). Here's how I can get 3 out of 4:
// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
or
BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
What should I do to block the thread, waiting for all four conditions? All recommendations are welcome. I'm not set on any architecture or data structures.
EDIT: ******** Thanks for the help everyone. Here's my solution ********
First I don't think there was an asynchronous implementation of the producer/consumer queue. So I implemented something similar to this stackoverflow post.
I needed an external and internal cancellation source to stop the consumer thread and cancel the intermediate tasks, respectively, similar to this article.
byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
CancellationToken internalToken = internalTokenSource.Token;
CancellationToken stopToken = stopTokenSource.Token;
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
{
CancellationToken ct = linkedCts.Token;
Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);
// Wait for at least one task to complete
await Task.WhenAny(readTask, msgTask, keepAliveTask);
// Next cancel the other tasks
internalTokenSource.Cancel();
try {
await Task.WhenAll(readTask, msgTask, keepAliveTask);
} catch (OperationCanceledException e) {
if (e.CancellationToken == stopToken)
throw;
}
if (msgTask.IsCompleted)
// Send the network layer message
else if (readTask.IsCompleted)
// Process the byte from the physical layer
else
Contract.Assert(keepAliveTask.IsCompleted);
// Send a keep alive message
}
}
I would go with your option two, waiting for any of the 4 conditions to happen. Assuming you have the 4 tasks as awaitable methods already:
var task1 = WaitForByteReceivedAsync();
var task2 = WaitForMessageAvailableAsync();
var task3 = WaitForKeepAliveTimerAsync();
var task4 = WaitForCommunicationCancelledAsync();
// now gather them
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{
task1, task2, task3, task4
};
// Wait for any of the things to complete
var result = await Task.WhenAny(theTasks);
The code above will resume immediately after the first task completes, and ignore the other 3.
Note:
In the documentation for WhenAny, it says:
The returned task will always end in the RanToCompletion state with its Result set to the first task to complete. This is true even if the first task to complete ended in the Canceled or Faulted state.
So make sure to do that final check before trusting what happened:
if(result.Result.Result == true)
... // First Result is the task, the second is the bool that the task returns
In this case, I would only use cancellation tokens for cancellation. A repeated timeout like a keep-alive timer is better represented as a timer.
So, I would model this as three cancelable tasks. First, the cancellation token:
All communication was cancelled by the network layer
CancellationToken token = ...;
Then, three concurrent operations:
A byte is received
var readByteTask = stream.ReadAsync(buf, 0, 1, token);
The keep-alive timer has expired
var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);
A message is available from the network thread
This one is a bit trickier. Your current code uses BlockingCollection<T>
, which is not async-compatible. I recommend switching to TPL Dataflow's BufferBlock<T>
or my own AsyncProducerConsumerQueue<T>
, either of which can be used as async-compatible producer/consumer queues (meaning that the producer can be sync or async, and the consumer can be sync or async).
BufferBlock<byte[]> SendQueue = new ...;
...
var messageTask = SendQueue.ReceiveAsync(token);
Then you can use Task.WhenAny
to determine which of these tasks completed:
var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask);
Now, you can retrieve results by comparing completedTask
to the others and await
ing them:
if (completedTask == readByteTask)
{
// Throw an exception if there was a read error or cancellation.
await readByteTask;
var byte = buf[0];
...
// Continue reading
readByteTask = stream.ReadAsync(buf, 0, 1, token);
}
else if (completedTask == keepAliveTimerTask)
{
// Throw an exception if there was a cancellation.
await keepAliveTimerTask;
...
// Restart keepalive timer.
keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);
}
else if (completedTask == messageTask)
{
// Throw an exception if there was a cancellation (or the SendQueue was marked as completed)
byte[] message = await messageTask;
...
// Continue reading
messageTask = SendQueue.ReceiveAsync(token);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With