Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it acceptable to use TaskCompletionSource as a WaitHandle substitute?

Tags:

My code handles a TCP connection to a remote host, with a ConcurrentQueue to store outgoing messages. It's intended to run in a single thread. The lifetime of the connection is contained within RunAsync while a separate object contains the "public state" of the connection:

class PublicState
{
    internal readonly ConcurrentQueue<Message> OutgoingMessageQueue = new ConcurrentQueue<Message>();
    internal TaskCompletionSource<Object> OutgoingMessageTcs = null;

    internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();

    public void EnqueueMessages(IEnumerable<Message> messages)
    {
        foreach( Message m in messages ) this.OutgoingMessageQueue.Enqueue( m);
        if( this.OutgoingMessageTcs == null ) this.OutgoingMessageTcs = new TaskCompletionSource<Object>();
        this.OutgoingMessageTcs.SetResult( null );
    }
}

static async Task RunAsync(IPEndPoint endPoint, PublicState state)
{
    using( TcpClient tcp = new TcpClient() )
    {
        await tcp.ConnectAsync( endPoint.Address, endPoint.Port ).ConfigureAwait(false);

        Byte[] reusableBuffer = new Byte[ 4096 ];

        using( NetworkStream ns = tcp.GetStream() )
        {
            state.ConnectedTcs.SetResult( null );

            Task<Int32> nsReadTask = null;

            while( tcp.Connected )
            {
                if( !state.writeQueue.IsEmpty )
                {
                    await WriteMessagesAsync( ... ).ConfigureAwait( false );
                }

                if( ns.DataAvailable )
                {
                    await ReadMessagesAsync( ... ).ConfigureAwait( false );
                }

                // Wait for new data to arrive from remote host or for new messages to send:
                if( state.OutgoingMessageTcs == null ) state.OutgoingMessageTcs = new TaskCompletionSource<Object>();
                if( nsReadTask == null ) nsReadTask = ns.ReadAsync( reusableBuffer, 0, 0 ).ConfigureAwait( false );

                Task c = await Task.WhenAny( state.OutgoingMessageTcs,  nsReadTask ).ConfigureAwait( false );
                if( c == state.OutgoingMessageTcs.Task ) state.OutgoingMessageTcs = null;
                else if( c == nsReadTask ) nsReadTask = null;
            }
        }
    }
}

Used like this:

public async Task Main(String[] args)
{
    PublicState state = new PublicState();
    Task clientTask = Client.RunAsync( new IPEndPoint(args[0]), state );

    await state.ConnectedTcs.Task; // awaits until TCP connection is established

    state.EnqueueMessage( new Message("foo") );
    state.EnqueueMessage( new Message("bar") );
    state.EnqueueMessage( new Message("baz") );

    await clientTask; // awaits until the TCP connection is closed
}

This code works, but I don't like it: it feels like I'm using TaskCompletionSource which is meant to represent an actual Task or some kind of background operation, whereas I'm really using TaskCompletionSource as a kind of cheap EventWaitHandle. I'm not using EventWaitHandle because it's IDisposable (I don't want to risk leaking native resources) and it lacks a WaitAsync or WaitOneAsync method. I could use SemaphoreSlim (which is awaitable, but wraps an EventWaitHandle) but my code doesn't really represent a good use of a semaphore.

Is my use of TaskCompletionSource<T> acceptable, or is there a better way to "un-await" execution in RunAsync when an item is added to OutgoingMessageQueue?

Another reason I feel it's "wrong" is that TaskCompletionSource<T> can only be used once, then it needs replacing. I'm keen to avoid extraneous allocations.

like image 591
Dai Avatar asked Apr 09 '18 06:04

Dai


People also ask

What is the use of TaskCompletionSource?

The TaskCompletionSource class is a great way to convert such code into a Task you can simply await . It's a bit of additional work, but the result is much easier to read and use. Be sure to take full advantage of TaskCompletionSource in your asynchronous C# code!

Is TaskCompletionSource thread safe?

Is it safe to pass non-thread-safe objects created on one thread to another using TaskCompletionSource. SetResult()? Yes, as long as the object can be used on a different thread than the one it was created on (of course).


1 Answers

If I understood you correctly - TPL BufferBlock might be what you need. Analog of current Enqueue is Post, and you can receive next value via ReceiveAsync extension method.

So with BufferBlock your code becomes something like this:

class PublicState {
    internal readonly BufferBlock<Message> OutgoingMessageQueue = new BufferBlock<Message>();
    internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();

    public void EnqueueMessage(Message message) {
        this.OutgoingMessageQueue.Post(message);
    }
}

static async Task RunAsync(IPEndPoint endPoint, PublicState state) {
    using (TcpClient tcp = new TcpClient()) {
        await tcp.ConnectAsync(endPoint.Address, endPoint.Port).ConfigureAwait(false);

        Byte[] reusableBuffer = new Byte[4096];

        using (NetworkStream ns = tcp.GetStream()) {
            state.ConnectedTcs.SetResult(null);

            Task<Int32> nsReadTask = null;
            Task<Message> newMessageTask = null;
            while (tcp.Connected) {
                // Wait for new data to arrive from remote host or for new messages to send:
                if (nsReadTask == null)
                    nsReadTask = ns.ReadAsync(reusableBuffer, 0, 0);
                if (newMessageTask == null)
                    newMessageTask = state.OutgoingMessageQueue.ReceiveAsync();
                var completed = await Task.WhenAny(nsReadTask, newMessageTask).ConfigureAwait(false);
                if (completed == newMessageTask) {
                    var result = await newMessageTask;
                    // do stuff
                    newMessageTask = null;
                }
                else {
                    var bytesRead = await nsReadTask;
                    nsReadTask = null;
                }
            }
        }
    }
}

As a bonus, this version is (I think) thread-safe, while your current version is not, because you are doing non-thread-safe things with OutgoingMessageTcs from potentially multiple threads (thread of RunAsync and thread of EnqueueMessages caller).

If for some reason you don't like BufferBlock - you can use AsyncCollection from Nito.AsyncEx nuget package in exactly the same way. Initialization becomes:

internal readonly AsyncCollection<Message> OutgoingMessageQueue = new AsyncCollection<Message>(new ConcurrentQueue<Message>());

And fetching:

if (newMessageTask == null)
   newMessageTask = state.OutgoingMessageQueue.TakeAsync();
like image 117
Evk Avatar answered Sep 19 '22 12:09

Evk