Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C# Await Multiple Events in Producer/Consumer

I'm implementing a data link layer using a producer/consumer pattern. The data link layer has its own thread and state machine to communicate the data link protocol over the wire (Ethernet, RS-232...). The interface to the physical layer is represented as a System.IO.Stream. Another thread writes messages to and reads messages from the data link object.

The data link object has an idle state that must wait for one of four conditions:

  1. A byte is received
  2. A message is available from the network thread
  3. The keep-alive timer has expired
  4. All communication was cancelled by the network layer

I'm having a difficult time figuring out the best way to do this without splitting up communication into a read/write thread (thereby significantly increasing the complexity). Here's how I can get 3 out of 4:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);

or

BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);

What should I do to block the thread, waiting for all four conditions? All recommendations are welcome. I'm not set on any architecture or data structures.

EDIT: ******** Thanks for the help everyone. Here's my solution ********

First I don't think there was an asynchronous implementation of the producer/consumer queue. So I implemented something similar to this stackoverflow post.

I needed an external and internal cancellation source to stop the consumer thread and cancel the intermediate tasks, respectively, similar to this article.

byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
    CancellationToken internalToken = internalTokenSource.Token;
    CancellationToken stopToken = stopTokenSource.Token;
    using (CancellationTokenSource linkedCts =
        CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
    {
        CancellationToken ct = linkedCts.Token;
        Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
        Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
        Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);

        // Wait for at least one task to complete
        await Task.WhenAny(readTask, msgTask, keepAliveTask);

        // Next cancel the other tasks
        internalTokenSource.Cancel();
        try {
            await Task.WhenAll(readTask, msgTask, keepAliveTask);
        } catch (OperationCanceledException e) {
            if (e.CancellationToken == stopToken)
                throw;
        }

        if (msgTask.IsCompleted)
            // Send the network layer message
        else if (readTask.IsCompleted)
            // Process the byte from the physical layer
        else
            Contract.Assert(keepAliveTask.IsCompleted);
            // Send a keep alive message
    }
}
like image 882
Brian Heilig Avatar asked Mar 02 '16 14:03

Brian Heilig


2 Answers

I would go with your option two, waiting for any of the 4 conditions to happen. Assuming you have the 4 tasks as awaitable methods already:

var task1 = WaitForByteReceivedAsync();
var task2 = WaitForMessageAvailableAsync();
var task3 = WaitForKeepAliveTimerAsync();
var task4 = WaitForCommunicationCancelledAsync();

// now gather them
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{
task1, task2, task3, task4
};

// Wait for any of the things to complete
var result = await Task.WhenAny(theTasks);

The code above will resume immediately after the first task completes, and ignore the other 3.

Note:

In the documentation for WhenAny, it says:

The returned task will always end in the RanToCompletion state with its Result set to the first task to complete. This is true even if the first task to complete ended in the Canceled or Faulted state.

So make sure to do that final check before trusting what happened:

if(result.Result.Result == true) 
... // First Result is the task, the second is the bool that the task returns
like image 169
Pedro G. Dias Avatar answered Sep 29 '22 16:09

Pedro G. Dias


In this case, I would only use cancellation tokens for cancellation. A repeated timeout like a keep-alive timer is better represented as a timer.

So, I would model this as three cancelable tasks. First, the cancellation token:

All communication was cancelled by the network layer

CancellationToken token = ...;

Then, three concurrent operations:

A byte is received

var readByteTask = stream.ReadAsync(buf, 0, 1, token);

The keep-alive timer has expired

var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);

A message is available from the network thread

This one is a bit trickier. Your current code uses BlockingCollection<T>, which is not async-compatible. I recommend switching to TPL Dataflow's BufferBlock<T> or my own AsyncProducerConsumerQueue<T>, either of which can be used as async-compatible producer/consumer queues (meaning that the producer can be sync or async, and the consumer can be sync or async).

BufferBlock<byte[]> SendQueue = new ...;
...
var messageTask = SendQueue.ReceiveAsync(token);

Then you can use Task.WhenAny to determine which of these tasks completed:

var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask);

Now, you can retrieve results by comparing completedTask to the others and awaiting them:

if (completedTask == readByteTask)
{
  // Throw an exception if there was a read error or cancellation.
  await readByteTask;
  var byte = buf[0];
  ...
  // Continue reading
  readByteTask = stream.ReadAsync(buf, 0, 1, token);
}
else if (completedTask == keepAliveTimerTask)
{
  // Throw an exception if there was a cancellation.
  await keepAliveTimerTask;
  ...
  // Restart keepalive timer.
  keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);
}
else if (completedTask == messageTask)
{
  // Throw an exception if there was a cancellation (or the SendQueue was marked as completed)
  byte[] message = await messageTask;
  ...
  // Continue reading
  messageTask = SendQueue.ReceiveAsync(token);
}
like image 34
Stephen Cleary Avatar answered Sep 29 '22 16:09

Stephen Cleary