Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I adapt DatagramSocket.MessageReceived for use with async/await?

I would like to wrap the following datagram socket operation with TPL to clean up the API so that it works nicely with async and await, much like the StreamSocket class does.

public static async Task<bool> TestAsync(HostName hostName, string serviceName, byte[] data)
{
    var tcs = new TaskCompletionSource<bool>();
    var socket = new DatagramSocket();
    socket.MessageReceived += (sender, e) =>
    {
        var status = false; // Status value somehow derived from e etc.
        tcs.SetResult(status);
    };
    await socket.ConnectAsync(hostName, serviceName);
    var stream = await socket.GetOutputStreamAsync();
    var writer = new DataWriter(stream);
    writer.WriteBytes(data);
    await writer.StoreAsync();
    return tcs.Task;
}

The sticking point is the MessageReceived event which turns the DatagramSocket class into a strange mishmash of the event asynchronous pattern and the new async pattern. Anyway, TaskCompletionSource<T> allows me to adapt the handler to conform to the latter, so that's not too scary.

This seems to work pretty well unless the endpoint never returns any data. The task associated with the MessageReceived handler never completes and thus the task returned from TestAsync never completes.

What's the correct way to nicely wrap this operation to incorporate a timeout and cancellation? I would like to extend this function to take a CancellationToken argument for the latter, but what do I do with it? The only thing I've come up with is to create an additional "monitoring" task using Task.Delay to which I pass a timeout value and the cancellation token to support these two behaviours along the following lines:

public static async Task<bool> CancellableTimeoutableTestAsync(HostName hostName, string serviceName, byte[] data, CancellationToken userToken, int timeout)
{
    var tcs = new TaskCompletionSource<bool>();
    var socket = new DatagramSocket();
    socket.MessageReceived += (sender, e) =>
    {
        var status = false; // Status value somehow derived from e etc.
        tcs.SetResult(status);
    };
    await socket.ConnectAsync(hostName, serviceName);
    var stream = await socket.GetOutputStreamAsync();
    var writer = new DataWriter(stream);
    writer.WriteBytes(data);
    await writer.StoreAsync();

    var delayTask = Task.Delay(timeout, userToken);
    var t1 = delayTask.ContinueWith(t => { /* Do something to tcs to indicate timeout */ }, TaskContinuationOptions.OnlyOnRanToCompletion);
    var t2 = delayTask.ContinueWith(t => { tcs.SetCanceled(); }, TaskContinuationOptions.OnlyOnCanceled);

    return tcs.Task;
}

However, this has all kinds of problems including potential race conditions between the delay task and the MessageReceived handler. I have never been able to get this approach to work reliably, plus it seems ridiculously complicated as well as an inefficient use of the thread pool. It's fiddly, error-prone and hurts my head.

Side note: Am I the only person who is confused by the DatagramSocket API in general? Not only does it seem to be an ugly conglomeration of the IAsyncAction WinRT model and TPL with some tricky EAP thrown in, I'm not terribly comfortable with an API that is intended to represent a fundamentally connectionless protocol such as UDP containing methods named ConnectAsync in them. This seems to be a contradiction in terms to me.

like image 740
Richard Cook Avatar asked Nov 04 '22 05:11

Richard Cook


1 Answers

First, I think the interface of DatagramSocket makes sense precisely because of the nature of UDP. If you have a stream of datagrams, then an event is an appropriate way to represent that. WinRT IAsyncAction (or .Net Task) can only represent pull model, where you explicitly request each piece of data (e.g. there could be a method ReadNextDatagramAsync()). This makes sense for TCP, because it has flow control, so if you read the data slowly, the sender will send them slowly too. But for UDP, the push model (represented by an event in WinRT and .Net) makes much more sense.

And I agree that the name Connect doesn't make 100 % sense, but I think it mostly does makes sense, especially to make it more consistent with StreamSocket. And you do need a method like this, so that the system can resolve the domain name and assign a port to your socket.

For your method, I agree with @usr that you should create a separate method for receiving the datagram. And if you want to convert one asynchronous model to another, while adding features the original model doesn't support natively, it is going to fiddly, I think there is nothing you can do about that.

It also won't be inefficient, if you implement it properly: you should make sure that after the Task has completed, the MessageReceived event is unsubscribed, the timer associated with Delay() is disposed (you do that by canceling the token you passed to Delay()) and the delegate that was registered with the passed-in CancellationToken is unregisted (I think you should use Register() directly instead of (ab)using Delay() for that).

Regarding race conditions, of course you have to think about them. But there is a relatively simple way to deal with that here: use the Try methods of TaskCompletionSource (e.g. TrySetResult()).

like image 187
svick Avatar answered Nov 12 '22 13:11

svick