Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NamedPipeServerStream.ReadAsync() does not exit when CancellationToken requests cancell

When the NamedPipeServer stream reads any data from the pipe it does not react to CancellationTokenSource.Cancel()

Why is that?

How can I limit the time I'm waiting in the server for data from the client?

Code to reproduce:

static void Main(string[] args)
{
    Server();
    Clinet();
    Console.WriteLine("press [enter] to exit");
    Console.ReadLine();
}

private static async Task Server()
{
    using (var cancellationTokenSource = new CancellationTokenSource(1000))
    using (var server = new NamedPipeServerStream("test",
        PipeDirection.InOut,
        1,
        PipeTransmissionMode.Byte,
        PipeOptions.Asynchronous))
    {
        var cancellationToken = cancellationTokenSource.Token;
        await server.WaitForConnectionAsync(cancellationToken);
        await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
        var buffer = new byte[4];
        await server.ReadAsync(buffer, 0, 4, cancellationToken);
        Console.WriteLine("exit server");
    }
}

private static async Task Clinet()
{
    using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
    {
        var buffer = new byte[4];
        client.Connect();
        client.Read(buffer, 0, 4);
        await Task.Delay(5000);
        await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
        Console.WriteLine("client exit");
    }
}

Expected result:

exit server
<client throws exception cuz server closed pipe>

Actual result:

client exit
exit server

EDIT

The answer with CancelIo seems promising, and it does allow the server to end communication when the cancellation token is canceled. However, I don't understand why my "base scenario" stopped working when using ReadPipeAsync.

Here is the code, it includes 2 client functions:

  1. Clinet_ShouldWorkFine - a good client that reads/writes in time
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - a client too slow, server should end the communication

Expected:

  1. Clinet_ShouldWorkFine - execution ends without any excepiton
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - server closes the pipe, client throws exception

Actual:

  1. Clinet_ShouldWorkFine - server stops at first call to ReadPipeAsync, pipe is closed afer 1s, client throws exception
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - server closes the pipe, client throws exception

Why is Clinet_ShouldWorkFine not working when the server uses ReadPipeAsync

class Program
{
    static void Main(string[] args) {
        // in this case server should close the pipe cuz client is too slow
        try {
            var tasks = new Task[3];
            tasks[0] = Server();
            tasks[1] = tasks[0].ContinueWith(c => {
                Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
            });
            tasks[2] = Clinet_ServerShouldEndCommunication_CuzClientIsSlow();
            Task.WhenAll(tasks).Wait();
        }
        catch (Exception ex) {
            Console.WriteLine(ex);
        }

        // in this case server should exchange data with client fine
        try {
            var tasks = new Task[3];
            tasks[0] = Server();
            tasks[1] = tasks[0].ContinueWith(c => {
                Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
            });
            tasks[2] = Clinet_ShouldWorkFine();
            Task.WhenAll(tasks).Wait();
        }
        catch (Exception ex) {
            Console.WriteLine(ex);
        }

        Console.WriteLine("press [enter] to exit");
        Console.ReadLine();
    }

    private static async Task Server()
    {
        using (var cancellationTokenSource = new CancellationTokenSource(1000))
        using (var server = new NamedPipeServerStream("test",
            PipeDirection.InOut,
            1,
            PipeTransmissionMode.Byte,
            PipeOptions.Asynchronous))
        {
            var cancellationToken = cancellationTokenSource.Token;
            await server.WaitForConnectionAsync(cancellationToken);
            await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
            await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
            var buffer = new byte[4];
            var bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
            var bytes2 = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
            Console.WriteLine("exit server");
        }
    }

    private static async Task Clinet_ShouldWorkFine()
    {
        using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
        {
            var buffer = new byte[4];
            client.Connect();
            client.Read(buffer, 0, 4);
            client.Read(buffer, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            Console.WriteLine("client exit");
        }
    }

    private static async Task Clinet_ServerShouldEndCommunication_CuzClientIsSlow()
    {
        using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
        {
            var buffer = new byte[4];
            client.Connect();
            client.Read(buffer, 0, 4);
            client.Read(buffer, 0, 4);
            await Task.Delay(5000);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            Console.WriteLine("client exit");
        }
    }
}

public static class AsyncPipeFixer {

    public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
        var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
        var async = pipe.BeginRead(buffer, offset, count, null, null);
        return new Task<int>(() => {
            try { return pipe.EndRead(async); }
            finally { registration.Dispose(); }
        }, cancellationToken);
    }

    private static void CancelPipeIo(PipeStream pipe) {
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try {
            CancelIo(pipe.SafePipeHandle);
        }
        catch (ObjectDisposedException) { }
    }
    [DllImport("kernel32.dll")]
    private static extern bool CancelIo(SafePipeHandle handle);

}
like image 978
inwenis Avatar asked Oct 03 '18 17:10

inwenis


1 Answers

.NET programmers get horribly in trouble with async/await when they write little test programs like this. It composes poorly, it is turtles all the way up. This program is missing the final turtle, the tasks are deadlocking. Nobody is taking care of letting the task continuations execute, as would normally happen in (say) a GUI app. Exceedingly hard to debug as well.

First make a minor change so the deadlock is completely visible:

   int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

This takes a nasty little corner-case away, the Server method making it all the way to the "Server exited" message. A chronic problem with the Task class is that when a task completes or an awaited method finished synchronously then it will try to run the continuation directly. That happens to work in this program. By forcing it to obtain the async result, the deadlock is now obvious.


Next step is to fix Main() so these tasks can't deadlock anymore. That could look like this:

static void Main(string[] args) {
    try {
        var tasks = new Task[3];
        tasks[0] = Server();
        tasks[1] = tasks[0].ContinueWith(c => {
            Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
        });
        tasks[2] = Clinet();
        Task.WhenAll(tasks).Wait();
    }
    catch (Exception ex) {
        Console.WriteLine(ex);
    }
    Console.WriteLine("press [enter] to exit");
    Console.ReadLine();
}

Now we have a shot at getting ahead and actually fix the cancellation problem. The NamedPipeServerStream class does not implement ReadAsync itself, it inherits the method from one of its base classes, Stream. It has a ratty little detail that is completely under-documented. You can only see it when you stare at the framework source code. It can only detect cancellation when the cancel occurred before you call ReadAsync(). Once it the read is started it no longer can see a cancellation. The ultimate problem you are trying to fix.

It is a fixable problem, I have but a murky idea why Microsoft did not do this for PipeStreams. The normal way to force a BeginRead() method to complete early is to Dispose() the object, also the only way that Stream.ReadAsync() can be interrupted. But there is another way, on Windows it is possible to interrupt an I/O operation with CancelIo(). Let's make it an extension method:

using System;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.IO.Pipes;
using Microsoft.Win32.SafeHandles;

public static class AsyncPipeFixer {

    public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
        var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
        var async = pipe.BeginRead(buffer, offset, count, null, null);
        return new Task<int>(() => {
            try { return pipe.EndRead(async); }
            finally { registration.Dispose(); }
        }, cancellationToken);
    }

    private static void CancelPipeIo(PipeStream pipe) {
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try {
            CancelIo(pipe.SafePipeHandle);
        }
        catch (ObjectDisposedException) { }
    }
    [DllImport("kernel32.dll")]
    private static extern bool CancelIo(SafePipeHandle handle);

}

And finally tweak the server to use it:

    int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

Do beware that this workaround is specific to Windows so can't work in a .NETCore program that targets a Unix flavor. Then consider the heavier hammer, call pipe.Close() in the CancelPipeIo() method.

like image 118
3 revs Avatar answered Oct 12 '22 22:10

3 revs