Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C# How to coordinate read and write threads while reconnecting NetworkStream?

I have a thread writing requests to NetworkStream. Another thread is reading responses from this stream.

I want to make it fault tolerant. In case of network failure, I want NetworkStream to be disposed replaced by a brand new one.

I made the two threads handle IO/Socket exceptions. Each of them will try to re-establish the connection. I'm struggling to coordinate these two threads. I had to place lock sections making the code rather complicated and error-prone.

Is there a recommended way to implement this? Perhaps using a single thread but making Read or Write async?

like image 290
Igor Gatis Avatar asked Nov 10 '22 05:11

Igor Gatis


1 Answers

I find it easiest to have a single thread that handles coordination and writing and then handle reading using async. By including a CancellationTokenSource in an AsyncState object the reader code can signal the sending thread to restart the connection, when the receiver encounters an error (calling EndRead or if the stream completes). The coordination/writer sits in a loop that creates the connection and then loops consuming a BlockingCollection<T> of items to send. By using BlockingCollection.GetConsumingEnumerable(token) the sender can be cancelled when the reader encounters an error.

    private class AsyncState
    {
        public byte[] Buffer { get; set; }
        public NetworkStream NetworkStream { get; set; }
        public CancellationTokenSource CancellationTokenSource { get; set; }
    }

Once you've created the connection you can start the async reading process (which keeps calling itself as long as everything is working). Passing the buffer, the stream and the CancellationTokenSource in the state object:

var buffer = new byte[1];
stream.BeginRead(buffer, 0, 1, Callback,
   new AsyncState
   {
       Buffer = buffer,
       NetworkStream = stream,
       CancellationTokenSource = cts2
   });

After that you start reading from your output queue and writing to the stream until cancelled or a failure happens:

using (var writer = new StreamWriter(stream, Encoding.ASCII, 80, true))
{
    foreach (var item in this.sendQueue.GetConsumingEnumerable(cancellationToken))
    {
       ...

... and in the Callback you can check for failures and, if necessary, hit the CancellationTokenSource to signal the writer thread to restart the connection.

private void Callback(IAsyncResult ar)
{
  var state = (AsyncState)ar.AsyncState;

  if (ar.IsCompleted)
  {
    try
    {
        int bytesRead = state.NetworkStream.EndRead(ar);
        LogState("Post read ", state.NetworkStream);
    }
    catch (Exception ex)
    {
        Log.Warn("Exception during EndRead", ex);
        state.CancellationTokenSource.Cancel();
        return;
    }

    // Deal with the character received
    char c = (char)state.Buffer[0];

    if (c < 0)
    {
        Log.Warn("c < 0, stream closing");
        state.CancellationTokenSource.Cancel();
        return;
    }

    ... deal with the character here, building up a buffer and
    ... handing it out to the application when completed
    ... perhaps using Rx Subject<T> to make it easy to subscribe

    ... and finally ask for the next byte with the same Callback

    // Launch the next reader
    var buffer2 = new byte[1];
    var state2 = state.WithNewBuffer(buffer2);
    state.NetworkStream.BeginRead(buffer2, 0, 1, Callback, state2);
like image 107
Ian Mercer Avatar answered Nov 14 '22 22:11

Ian Mercer