Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Network Command Processing with TPL Dataflow

I'm working on a system that involves accepting commands over a TCP network connection, then sending responses upon execution of those commands. Fairly basic stuff, but I'm looking to support a few requirements:

  1. Multiple clients can connect at the same time and establish separate sessions. Sessions can last as long or as short as desired, with the same client IP able to establish multiple parallel sessions, if desired.
  2. Each session can process multiple commands at the same time, as some of the requested operations can be performed in parallel.

I'd like to implement this cleanly using async/await and, based on what I've read, TPL Dataflow sounds like a good way to cleanly break up the processing into nice chunks that can run on the thread pool instead of tying up threads for different sessions/commands, blocking on wait handles.

This is what I'm starting with (some parts stripped out to simplify, such as details of exception handling; I've also omitted a wrapper that provides an efficient awaitable for the network I/O):

    private readonly Task _serviceTask;
    private readonly Task _commandsTask;
    private readonly CancellationTokenSource _cancellation;
    private readonly BufferBlock<Command> _pendingCommands;

    public NetworkService(ICommandProcessor commandProcessor)
    {
        _commandProcessor = commandProcessor;
        IsRunning = true;
        _cancellation = new CancellationTokenSource();
        _pendingCommands = new BufferBlock<Command>();
        _serviceTask = Task.Run((Func<Task>)RunService);
        _commandsTask = Task.Run((Func<Task>)RunCommands);
    }

    public bool IsRunning { get; private set; }

    private async Task RunService()
    {
        _listener = new TcpListener(IPAddress.Any, ServicePort);
        _listener.Start();

        while (IsRunning)
        {
            Socket client = null;
            try
            {
                client = await _listener.AcceptSocketAsync();
                client.Blocking = false;

                var session = RunSession(client);
                lock (_sessions)
                {
                    _sessions.Add(session);
                }
            }
            catch (Exception ex)
            { //Handling here...
            }
        }
    }

    private async Task RunCommands()
    {
        while (IsRunning)
        {
            var command = await _pendingCommands.ReceiveAsync(_cancellation.Token);
            var task = Task.Run(() => RunCommand(command));
        }
    }

    private async Task RunCommand(Command command)
    {
        try
        {
            var response = await _commandProcessor.RunCommand(command.Content);
            Send(command.Client, response);
        }
        catch (Exception ex)
        {
            //Deal with general command exceptions here...
        }
    }

    private async Task RunSession(Socket client)
    {
        while (client.Connected)
        {
            var reader = new DelimitedCommandReader(client);

            try
            {
                var content = await reader.ReceiveCommand();
                _pendingCommands.Post(new Command(client, content));
            }
            catch (Exception ex)
            {
                //Exception handling here...
            }
        }
    }

The basics seem straightforward, but one part is tripping me up: how do I make sure that when I'm shutting down the application, I wait for all pending command tasks to complete? I get the Task object when I use Task.Run to execute the command, but how do I keep track of pending commands so that I can make sure that all of them are complete before allowing the service to shut down?

I've considered using a simple List, with removal of commands from the List as they finish, but I'm wondering if I'm missing some basic tools in TPL Dataflow that would allow me to accomplish this more cleanly.


EDIT:

Reading more about TPL Dataflow, I'm wondering if what I should be using is a TransformBlock with an increased MaxDegreeOfParallelism to allow processing parallel commands? This sets an upper limit on the number of commands that can run in parallel, but that's a sensible limitation for my system, I think. I'm curious to hear from those who have experience with TPL Dataflow to know if I'm on the right track.

like image 365
Dan Bryant Avatar asked Jan 31 '14 16:01

Dan Bryant


1 Answers

Yeah, so... you're kinda half using the power of TPL here. The fact that you're still manually receiving items from the BufferBlock in your own while loop in a background Task is not the "way" you want to do it if you're subscribing to the TPL DataFlow style.

What you would do is link an ActionBlock to the BufferBlock and do your command processing/sending from within that. This is also the block where you would set the MaxDegreeOfParallelism to control just how many concurrent commands you want to process. So that setup might look something like this:

// Initialization logic to build up the TPL flow
_pendingCommands = new BufferBlock<Command>();
_commandProcessor = new ActionBlock<Command>(this.ProcessCommand);

_pendingCommands.LinkTo(_commandProcessor);

private Task ProcessCommand(Command command)
{
   var response = await _commandProcessor.RunCommand(command.Content);
   this.Send(command.Client, response);
}

Then, in your shutdown code, you would need to signal that you're done adding items into the pipeline by calling Complete on the _pipelineCommands BufferBlock and then wait on the _commandProcessor ActionBlock to complete to ensure that all items have made their way through the pipeline. You do this by grabbing the Task returned by the block's Completion property and calling Wait on it:

_pendingCommands.Complete();
_commandProcessor.Completion.Wait();

If you want to go for bonus points, you can even separate the command processing from the command sending. This would allow you to configure those steps separately from one another. For example, maybe you need to limit the number of threads processing commands, but want to have more sending out the responses. You would do this by simply introducing a TransformBlock into the middle of the flow:

_pendingCommands = new BufferBlock<Command>();
_commandProcessor = new TransformBlock<Command, Tuple<Client, Response>>(this.ProcessCommand);
_commandSender = new ActionBlock<Tuple<Client, Response>(this.SendResponseToClient));

_pendingCommands.LinkTo(_commandProcessor);
_commandProcessor.LinkTo(_commandSender);

private Task ProcessCommand(Command command)
{
   var response = await _commandProcessor.RunCommand(command.Content);

   return Tuple.Create(command, response);
}

private Task SendResponseToClient(Tuple<Client, Response> clientAndResponse)
{
   this.Send(clientAndResponse.Item1, clientAndResponse.Item2);
}

You probably want to use your own data structure instead of Tuple, it was just for illustrative purposes, but the point is this is exactly the kind of structure you want to use to break up the pipeline so that you can control the various aspects of it exactly how you might need to.

like image 156
Drew Marsh Avatar answered Oct 20 '22 22:10

Drew Marsh