Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the best way to wait on a network packet using C#'s new async feature

I've recently been playing around with the new Async CTP, and I've come across a situation where I'm not sure how to proceed.

In my current code base, I'm using a concept of "jobs" and a "job manager". Jobs exist solely for the purpose of handling an initial message, sending a response, and then waiting the response.

I already have existing code based around synchronous sockets, where a network thread is waiting on data to arrive, and then passing it along to an event handler, and eventually to the job manager.

The job manager looks for what job would handle the message, and passes it along.

So the scenario is this:

  1. Job manager gets a new message and launches a job.
  2. The job starts, processes the message, and sends a reply message.
  3. At this point the job would wait for a response to the reply.

Here's a pseudocode example:

class MyJob : Job
{
    public override void RunJob( IPacketMsg packet )
    {
        // handle packet

        var myReply = new Packet();
        SendReply( myReply );

        await GetResponse();
    }
}

But I'm not entirely sure how to proceed at step 3. The job manager will get the response and then hand it along to the running job. But I'm not sure how to make the job wait for the response.

I've considered creating an awaited Task that simply blocks on a WaitHandle, but is this the best solution?

Are there any other things I could do in this case?

Edit On the subject of the Async CTP, what happens in a situation where the UI is not being used. I've read over Eric Lippert's Async blog, but I don't believe it ever touched on the subject of how everything works in the background without a UI thread (does it spin off a background worker or...?)

like image 883
Ryan Stecker Avatar asked Sep 12 '11 19:09

Ryan Stecker


2 Answers

  1. Job manager gets a new message and launches a job.
  2. The job starts, processes the message, and sends a reply message.
  3. At this point the job would wait for a response to the reply.

First off, I should mention that the Async CTP handles asynchronous operations very well, but asynchronous events not so much. You may want to consider an Rx-based approach. But let's proceed for the moment with the Async CTP.

You have two basic options to create Tasks:

  • With a delegate. e.g., Task.Factory.StartNew will run a delegate on the thread pool. Custom task factories and schedulers give you more options for task delegates (e.g., specifying the delegate must be run on an STA thread).
  • Without a delegate. e.g., TaskFactory.FromAsync wraps an existing Begin/End method pair, TaskEx.FromResult returns a "future constant", and TaskCompletionSource can be used to control a Task explicitly (both FromAsync and FromResult use TCS internally).

If the job processing is CPU-bound, it makes sense to pass it off to Task.Factory.StartNew. I'm going to assume the job processing is CPU-bound.

Job manager pseudo-code:

// Responds to a new message by starting a new job on the thread pool.
private void RespondToNewMessage(IPacketMsg message)
{
  IJob job = ..;
  Task.Factory.StartNew(job.RunJob(message));
}

// Holds tasks waiting for a response.
private ConcurrentDictionary<int, TaskCompletionSource<IResponse>> responseTasks = ..;

// Asynchronously gets a response for the specified reply.
public Task<IResponse> GetResponseForReplyAsync(int replyId)
{
  var tcs = new TaskCompletionSource<IResponse>();
  responseTasks.Add(replyId, tcs);
  return tcs.Task;
}

// Responds to a new response by completing and removing its task.
private void RespondToResponse(IResponse response)
{
  var tcs = responseTasks[response.ReplyId];
  responseTasks.Remove(response.ReplyId);
  tcs.TrySetComplete(response);
}

The idea is that the job manager also manages a list of oustanding responses. In order for this to happen, I introduced a simple int reply identifier that the job manager can use to determine which response goes with which reply.

Now jobs can work like this:

public override void RunJob(IPacketMsg packet)
{
  // handle packet
  var myReply = new Packet();
  var response = jobManager.GetResponseForReplyAsync(myReply.ReplyId);
  SendReply(myReply);

  await response;
}

There's a few tricky things since we're placing the jobs on the thread pool thread:

  1. GetResponseForReplyAsync must be invoked (registering the task) before the reply is sent, and is then awaited later. This is to avoid the situation where a reply may be sent and a response received before we have a chance to register for it.
  2. RespondToResponse will remove the task registration before completing it, just in case completing the task causes another reply to be sent with the same id.

If the jobs are short enough that they don't need to be placed on the thread pool thread, then the solution can be simplified.

like image 85
Stephen Cleary Avatar answered Oct 26 '22 22:10

Stephen Cleary


You would simply wire up the rest of your event handler with the await pattern like so:

 public async void RunJob(IPacketMsg msg)
 {
     // Do Stuff

     var response = await GetResponse();

     // response is "string", not "Task<string>"

     // Do More Stuff
 }

 public Task<string> GetResponse()
 {
     return Task.Factory.StartNew(() =>
        {
             _networkThingy.WaitForDataAvailable();

             return _networkThingy.ResponseString;
        });
 }

When your get response task finishes, the rest of the method picks up execution on your current synchronization context. Until then, however, your method execution is yielded (so any code after the wait is not run until the task started in GetResponse finishes)

like image 31
Tejs Avatar answered Oct 26 '22 22:10

Tejs