My code handles a TCP connection to a remote host, with a ConcurrentQueue
to store outgoing messages. It's intended to run in a single thread. The lifetime of the connection is contained within RunAsync
while a separate object contains the "public state" of the connection:
class PublicState
{
internal readonly ConcurrentQueue<Message> OutgoingMessageQueue = new ConcurrentQueue<Message>();
internal TaskCompletionSource<Object> OutgoingMessageTcs = null;
internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();
public void EnqueueMessages(IEnumerable<Message> messages)
{
foreach( Message m in messages ) this.OutgoingMessageQueue.Enqueue( m);
if( this.OutgoingMessageTcs == null ) this.OutgoingMessageTcs = new TaskCompletionSource<Object>();
this.OutgoingMessageTcs.SetResult( null );
}
}
static async Task RunAsync(IPEndPoint endPoint, PublicState state)
{
using( TcpClient tcp = new TcpClient() )
{
await tcp.ConnectAsync( endPoint.Address, endPoint.Port ).ConfigureAwait(false);
Byte[] reusableBuffer = new Byte[ 4096 ];
using( NetworkStream ns = tcp.GetStream() )
{
state.ConnectedTcs.SetResult( null );
Task<Int32> nsReadTask = null;
while( tcp.Connected )
{
if( !state.writeQueue.IsEmpty )
{
await WriteMessagesAsync( ... ).ConfigureAwait( false );
}
if( ns.DataAvailable )
{
await ReadMessagesAsync( ... ).ConfigureAwait( false );
}
// Wait for new data to arrive from remote host or for new messages to send:
if( state.OutgoingMessageTcs == null ) state.OutgoingMessageTcs = new TaskCompletionSource<Object>();
if( nsReadTask == null ) nsReadTask = ns.ReadAsync( reusableBuffer, 0, 0 ).ConfigureAwait( false );
Task c = await Task.WhenAny( state.OutgoingMessageTcs, nsReadTask ).ConfigureAwait( false );
if( c == state.OutgoingMessageTcs.Task ) state.OutgoingMessageTcs = null;
else if( c == nsReadTask ) nsReadTask = null;
}
}
}
}
Used like this:
public async Task Main(String[] args)
{
PublicState state = new PublicState();
Task clientTask = Client.RunAsync( new IPEndPoint(args[0]), state );
await state.ConnectedTcs.Task; // awaits until TCP connection is established
state.EnqueueMessage( new Message("foo") );
state.EnqueueMessage( new Message("bar") );
state.EnqueueMessage( new Message("baz") );
await clientTask; // awaits until the TCP connection is closed
}
This code works, but I don't like it: it feels like I'm using TaskCompletionSource
which is meant to represent an actual Task or some kind of background operation, whereas I'm really using TaskCompletionSource
as a kind of cheap EventWaitHandle
. I'm not using EventWaitHandle
because it's IDisposable
(I don't want to risk leaking native resources) and it lacks a WaitAsync
or WaitOneAsync
method. I could use SemaphoreSlim
(which is awaitable, but wraps an EventWaitHandle
) but my code doesn't really represent a good use of a semaphore.
Is my use of TaskCompletionSource<T>
acceptable, or is there a better way to "un-await" execution in RunAsync
when an item is added to OutgoingMessageQueue
?
Another reason I feel it's "wrong" is that TaskCompletionSource<T>
can only be used once, then it needs replacing. I'm keen to avoid extraneous allocations.
The TaskCompletionSource class is a great way to convert such code into a Task you can simply await . It's a bit of additional work, but the result is much easier to read and use. Be sure to take full advantage of TaskCompletionSource in your asynchronous C# code!
Is it safe to pass non-thread-safe objects created on one thread to another using TaskCompletionSource. SetResult()? Yes, as long as the object can be used on a different thread than the one it was created on (of course).
If I understood you correctly - TPL BufferBlock
might be what you need. Analog of current Enqueue
is Post
, and you can receive next value via ReceiveAsync
extension method.
So with BufferBlock
your code becomes something like this:
class PublicState {
internal readonly BufferBlock<Message> OutgoingMessageQueue = new BufferBlock<Message>();
internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();
public void EnqueueMessage(Message message) {
this.OutgoingMessageQueue.Post(message);
}
}
static async Task RunAsync(IPEndPoint endPoint, PublicState state) {
using (TcpClient tcp = new TcpClient()) {
await tcp.ConnectAsync(endPoint.Address, endPoint.Port).ConfigureAwait(false);
Byte[] reusableBuffer = new Byte[4096];
using (NetworkStream ns = tcp.GetStream()) {
state.ConnectedTcs.SetResult(null);
Task<Int32> nsReadTask = null;
Task<Message> newMessageTask = null;
while (tcp.Connected) {
// Wait for new data to arrive from remote host or for new messages to send:
if (nsReadTask == null)
nsReadTask = ns.ReadAsync(reusableBuffer, 0, 0);
if (newMessageTask == null)
newMessageTask = state.OutgoingMessageQueue.ReceiveAsync();
var completed = await Task.WhenAny(nsReadTask, newMessageTask).ConfigureAwait(false);
if (completed == newMessageTask) {
var result = await newMessageTask;
// do stuff
newMessageTask = null;
}
else {
var bytesRead = await nsReadTask;
nsReadTask = null;
}
}
}
}
}
As a bonus, this version is (I think) thread-safe, while your current version is not, because you are doing non-thread-safe things with OutgoingMessageTcs
from potentially multiple threads (thread of RunAsync
and thread of EnqueueMessages
caller).
If for some reason you don't like BufferBlock
- you can use AsyncCollection
from Nito.AsyncEx
nuget package in exactly the same way. Initialization becomes:
internal readonly AsyncCollection<Message> OutgoingMessageQueue = new AsyncCollection<Message>(new ConcurrentQueue<Message>());
And fetching:
if (newMessageTask == null)
newMessageTask = state.OutgoingMessageQueue.TakeAsync();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With