Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Named pipes efficient asynchronous design

The problem:

To design an efficient and very fast named-pipes client server framework.

Current state:

I already have battle proven production tested framework. It is fast, however it uses one thread per one pipe connection and if there are many clients the number of threads could fast be to high. I already use smart thread pool (task pool in fact) that can scale with need.

I already use OVERLAPED mode for pipes, but then I block with WaitForSingleObject or WaitForMultipleObjects so that is why I need one thread per connection on the server side

Desired solution:

Client is fine as it is, but on the server side I would like to use one thread only per client request and not per connection. So instead of using one thread for the whole lifecycle of client (connect / disconnect) I would use one thread per task. So only when client requests data and no more.

I saw an example on MSDN that uses array of OVERLAPED structures and then uses WaitForMultipleObjects to wait on them all. I find this a bad design. Two problems I see here. First you have to maintain an array that can grow quite large and deletions will be costly. Second, you have a lot of events, one for each array member.

I also saw completion ports, like CreateIoCompletionPort and GetQueuedCompletionStatus, but I don't see how they are any better.

What I would like is something ReadFileEx and WriteFileEx do, they call a callback routine when the operation is completed. This is a true async style of programming. But the problem is that ConnectNamedPipe does not support that and furthermore I saw that the thread needs to be in alertable state and you need to call some of the *Ex functions to have that.

So how is such a problem best solved?

Here is how MSDN does it: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365603(v=vs.85).aspx

The problem I see with this approach is that I can't see how you could have 100 clients connected at once if the limit to WaitForMultipleObjects is 64 handles. Sure I can disconnect the pipe after each request, but the idea is to have a permanent client connection just like in TCP server and to track the client through whole life-cycle with each client having unique ID and client specific data.

The ideal pseudo code should be like this:

repeat
  // wait for the connection or for one client to send data
  Result = ConnectNamedPipe or ReadFile or Disconnect; 

  case Result of
    CONNECTED: CreateNewClient; // we create a new client
    DATA: AssignWorkerThread; // here we process client request in a thread
    DISCONNECT: CleanupAndDeleteClient // release the client object and data
  end;
until Aborted;

This way we have only one listener thread that accepts connect / disconnect / onData events. Thread pool (worker thread) only process the actual request. This way 5 worker threads can serve a lot of clients that are connected.

P.S. My current code should not be important. I code this in Delphi but its pure WinAPI so the language does not matter.

EDIT:

For now IOCP look like the solution:

I/O completion ports provide an efficient threading model for processing multiple asynchronous I/O requests on a multiprocessor system. When a process creates an I/O completion port, the system creates an associated queue object for requests whose sole purpose is to service these requests. Processes that handle many concurrent asynchronous I/O requests can do so more quickly and efficiently by using I/O completion ports in conjunction with a pre-allocated thread pool than by creating threads at the time they receive an I/O request.

like image 276
Runner Avatar asked Jul 23 '13 09:07

Runner


2 Answers

If server must handle more than 64 events (read/writes) then any solution using WaitForMultipleObjects becomes unfeasible. This is the reason the Microsoft introduced IO completion ports to Windows. It can handle very high number of IO operations using the most appropriate number of threads (usually it's the number of processors/cores).

The problem with IOCP is that it is very difficult to implement right. Hidden issues are spread like mines in the field: [1], [2] (section 3.6). I would recommend using some framework. Little googling suggests something called Indy for Delphi developers. There are maybe others.

At this point I would disregard the requirement for named pipes if that means coding my own IOCP implementation. It's not worth the grief.

like image 147
Dialecticus Avatar answered Nov 19 '22 01:11

Dialecticus


I think what you're overlooking is that you only need a few listening named pipe instances at any given time. Once a pipe instance has connected, you can spin that instance off and create a new listening instance to replace it.

With MAXIMUM_WAIT_OBJECTS (or fewer) listening named pipe instances, you can have a single thread dedicated to listening using WaitForMultipleObjectsEx. The same thread can also handle the rest of the I/O using ReadFileEx and WriteFileEx and APCs. The worker threads would queue APCs to the I/O thread in order to initiate I/O, and the I/O thread can use the task pool to return the results (as well as letting the worker threads know about new connections).

The I/O thread main function would look something like this:

create_events();
for (index = 0; index < MAXIMUM_WAIT_OBJECTS; index++) new_pipe_instance(i);

for (;;)
{
    if (service_stopping && active_instances == 0) break;

    result = WaitForMultipleObjectsEx(MAXIMUM_WAIT_OBJECTS, connect_events, 
                    FALSE, INFINITE, TRUE);

    if (result == WAIT_IO_COMPLETION) 
    {
        continue;
    }
    else if (result >= WAIT_OBJECT_0 && 
                     result < WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS) 
    {
        index = result - WAIT_OBJECT_0;
        ResetEvent(connect_events[index]);

        if (GetOverlappedResult(
                connect_handles[index], &connect_overlapped[index], 
                &byte_count, FALSE))
            {
                err = ERROR_SUCCESS;
            }
            else
            {
                err = GetLastError();
            }

        connect_pipe_completion(index, err);
        continue;
    }
    else
    {
        fail();
    }
}

The only real complication is that when you call ConnectNamedPipe it may return ERROR_PIPE_CONNECTED to indicate that the call succeeded immediately or an error other than ERROR_IO_PENDING if the call failed immediately. In that case you need to reset the event and then handle the connection:

void new_pipe(ULONG_PTR dwParam)
{
    DWORD index = dwParam;

    connect_handles[index] = CreateNamedPipe(
        pipe_name, 
        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
        PIPE_TYPE_MESSAGE | PIPE_WAIT | PIPE_ACCEPT_REMOTE_CLIENTS,
        MAX_INSTANCES,
        512,
        512,
        0,
        NULL);

    if (connect_handles[index] == INVALID_HANDLE_VALUE) fail();

    ZeroMemory(&connect_overlapped[index], sizeof(OVERLAPPED));
    connect_overlapped[index].hEvent = connect_events[index];

    if (ConnectNamedPipe(connect_handles[index], &connect_overlapped[index])) 
    {
        err = ERROR_SUCCESS;
    }
    else
    {
        err = GetLastError();

        if (err == ERROR_SUCCESS) err = ERROR_INVALID_FUNCTION;

        if (err == ERROR_PIPE_CONNECTED) err = ERROR_SUCCESS;
    }

    if (err != ERROR_IO_PENDING) 
    {
        ResetEvent(connect_events[index]);
        connect_pipe_completion(index, err);
    }
}

The connect_pipe_completion function would create a new task in the task pool to handle the newly connected pipe instance, and then queue an APC to call new_pipe to create a new listening pipe at the same index.

It is possible to reuse existing pipe instances once they are closed but in this situation I don't think it's worth the hassle.

like image 1
Harry Johnston Avatar answered Nov 19 '22 01:11

Harry Johnston