Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scaling Connections with BlockingCollection<T>()

I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.

I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().

So something like (semi-pseudocode):

Socket Reading Task

Task.Run(() =>
{
    while (notCancelled)
    {
        element = ReadXml();
        switch (element)
        {
            case messageheader:
                MessageBlockingQueue.Add(deserialze<messageType>());
            ...
        }
    }
});

Message Buffer Task

Task.Run(() =>
{
    while (notCancelled)
    {
        Process(MessageQueue.Take());
    }
});

So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.

I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.

Is this an inefficient way to handle it? what would be a better way?

like image 577
FinalFortune Avatar asked Nov 20 '18 08:11

FinalFortune


2 Answers

You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.

Your read loop would become:

while (notCancelled) {
    var next = await queue.Reader.ReadAsync(optionalCancellationToken);
    Process(next);
}

and the producer:

switch (element)
{
    case messageheader:
        queue.Writer.TryWrite(deserialze<messageType>());
        ...
}

so: minimal changes


Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).

like image 56
Marc Gravell Avatar answered Oct 03 '22 00:10

Marc Gravell


I actually do something similar in another project. What I learned or would do differently are the following:

  1. First of all, better to use dedicated threads for the reading/writing loop (with new Thread(ParameterizedThreadStart)) because Task.Run uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.

    var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
    thread.Start(cancellationToken);
    
  2. Your Process can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:

    private void ReaderLoop(object state)
    {
        var token = (CancellationToken)state;
        while (!token.IsCancellationRequested)
        {
            try
            {
                var message = MessageQueue.Take(token);
                OnMessageReceived(new MessageReceivedEventArgs(message));
            }
            catch (OperationCanceledException)
            {
                if (!disposed && IsRunning)
                    Stop();
                break;
            }
        }
    }
    

Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:

public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
    void Callback(IAsyncResult ar)
    {
        var method = (EventHandler<TEventArgs>)ar.AsyncState;
        try
        {
            method.EndInvoke(ar);
        }
        catch (Exception e)
        {
            HandleError(e, method);
        }
    }

    foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
        handler.BeginInvoke(sender, args, Callback, handler);
}

So the OnMessageReceived implementation can be:

protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
    => messageReceivedHandler.InvokeAsync(this, e);
  1. Finally it was a big lesson that BlockingCollection<T> has some performance issues. It uses SpinWait internally, whose SpinOnce method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fast BlockingCollection implementation using an AutoResetEvent for triggering incoming data. I added a Take(CancellationToken) overload to it as follows:

    /// <summary>
    /// Takes an item from the <see cref="FastBlockingCollection{T}"/>
    /// </summary>
    public T Take(CancellationToken token)
    {
        T item;
        while (!queue.TryDequeue(out item))
        {
            waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
            token.ThrowIfCancellationRequested();
        }
    
        return item;
    }
    

Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection also will do it.

like image 43
György Kőszeg Avatar answered Oct 03 '22 02:10

György Kőszeg