Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Socket handle transfer between independent processes

Tags:

c

linux

sockets

I am doing some experiments in socket programming(in unix environment). What I am trying is

  1. Client sends request to Server.
  2. Server should send the clients socket to Worker(An Independent process)
  3. Worker should reply back to Client.

Is this possible?

This scenario works if Worker is a child of Server.

If Server and Worker are independent processes does this work? If yes can somebody give me some ideas about this ? Is there any samples available for this type of scenario ?

like image 579
Suyambu Avatar asked Sep 14 '12 12:09

Suyambu


People also ask

Can two processes share same socket?

Sockets can be shared among threads in a given process without using the WSADuplicateSocket function because a socket descriptor is valid in all threads of a process. The two (or more) descriptors that reference a shared socket can be used independently as far as I/O is concerned.

Can processes on the same machine use socket programming?

Sockets allow you to exchange information between processes on the same machine or across a network, distribute work to the most efficient machine, and they easily allow access to centralized data. Socket application program interfaces (APIs) are the network standard for TCP/IP.

Does each process have its own socket?

The system calls for establishing a connection are somewhat different for the client and the server, but both involve the basic construct of a socket. A socket is one end of an interprocess communication channel. The two processes each establish their own socket.

Can multiple clients connect to same Unix socket?

Yes, you can. The only constraint is your clients must be on the local host, and have R/W access to the socket pseudo-file.


2 Answers

The Linux Programming Interface book has examples for both sending and receiving file descriptors between unrelated processes, using an Unix domain socket.

For fun, I wrote my own examples from scratch. server.c:

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netdb.h>
#include <signal.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>

/* How many concurrent pending connections are allowed */
#define  LISTEN_BACKLOG     32

/* Unix domain socket path length (including NUL byte) */
#ifndef  UNIX_PATH_LEN
#define  UNIX_PATH_LEN    108
#endif

/* Flag to indicate we have received a shutdown request. */
volatile sig_atomic_t     done = 0;

/* Shutdown request signal handler, of the basic type. */
void handle_done_signal(int signum)
{
    if (!done)
        done = signum;

    return;
}

/* Install shutdown request signal handler on signal signum. */
int set_done_signal(const int signum)
{
    struct sigaction act;

    sigemptyset(&act.sa_mask);
    act.sa_handler = handle_done_signal;
    act.sa_flags = 0;

    if (sigaction(signum, &act, NULL) == -1)
        return errno;
    else
        return 0;
}

/* Return empty, -, and * as NULL, so users can use that
 * to bind the server to the wildcard address.
*/
char *wildcard(char *address)
{
    /* NULL? */
    if (!address)
        return NULL;

    /* Empty? */
    if (!address[0])
        return NULL;

    /* - or ? or * or : */
    if (address[0] == '-' || address[0] == '?' ||
        address[0] == '*' || address[0] == ':')
        return NULL;

    return address;
}


int main(int argc, char *argv[])
{
    struct addrinfo         hints;
    struct addrinfo        *list, *curr;

    int             listenfd, failure;

    struct sockaddr_un     worker;
    int             workerfd, workerpathlen;

    struct sockaddr_in6     conn;
    socklen_t         connlen;
    struct msghdr         connhdr;
    struct iovec         conniov;
    struct cmsghdr        *connmsg;
    char             conndata[1];
    char             connbuf[CMSG_SPACE(sizeof (int))];
    int             connfd;

    int             result;
    ssize_t             written;

    if (argc != 4) {
        fprintf(stderr, "\n");
        fprintf(stderr, "Usage: %s ADDRESS PORT WORKER\n", argv[0]);
        fprintf(stderr, "This creates a server that binds to ADDRESS and PORT,\n");
        fprintf(stderr, "and passes each connection to a separate unrelated\n");
        fprintf(stderr, "process using an Unix domain socket at WORKER.\n");
        fprintf(stderr, "\n");
        return (argc == 1) ? 0 : 1;
    }

    /* Handle HUP, INT, PIPE, and TERM signals,
     * so when the user presses Ctrl-C, the worker process cannot be contacted,
     * or the user sends a HUP or TERM signal, this server closes down cleanly. */
    if (set_done_signal(SIGINT) ||
        set_done_signal(SIGHUP) ||
        set_done_signal(SIGPIPE) ||
        set_done_signal(SIGTERM)) {
        fprintf(stderr, "Error: Cannot install signal handlers.\n");
        return 1;
    }

    /* Unix domain socket to the worker */
    memset(&worker, 0, sizeof worker);
    worker.sun_family = AF_UNIX;

    workerpathlen = strlen(argv[3]);
    if (workerpathlen < 1) {
        fprintf(stderr, "Worker Unix domain socket path cannot be empty.\n");
        return 1;
    } else
    if (workerpathlen >= UNIX_PATH_LEN) {
        fprintf(stderr, "%s: Worker Unix domain socket path is too long.\n", argv[3]);
        return 1;
    }

    memcpy(&worker.sun_path, argv[3], workerpathlen);
    /* Note: the terminating NUL byte was set by memset(&worker, 0, sizeof worker) above. */

    workerfd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (workerfd == -1) {
        fprintf(stderr, "Cannot create an Unix domain socket: %s.\n", strerror(errno));
        return 1;
    }
    if (connect(workerfd, (const struct sockaddr *)(&worker), (socklen_t)sizeof worker) == -1) {
        fprintf(stderr, "Cannot connect to %s: %s.\n", argv[3], strerror(errno));
        close(workerfd);
        return 1;
    }

    /* Initialize the address info hints */
    memset(&hints, 0, sizeof hints);
    hints.ai_family = AF_UNSPEC;        /* IPv4 or IPv6 */
    hints.ai_socktype = SOCK_STREAM;    /* Stream socket */
    hints.ai_flags = AI_PASSIVE        /* Wildcard ADDRESS */
                   | AI_ADDRCONFIG          /* Only return IPv4/IPv6 if available locally */
                   | AI_NUMERICSERV        /* Port must be a number */
                   ;
    hints.ai_protocol = 0;            /* Any protocol */

    /* Obtain the chain of possible addresses and ports to bind to */
    result = getaddrinfo(wildcard(argv[1]), argv[2], &hints, &list);
    if (result) {
        fprintf(stderr, "%s %s: %s.\n", argv[1], argv[2], gai_strerror(result));
        close(workerfd);
        return 1;
    }

    /* Bind to the first working entry in the chain */
    listenfd = -1;
    failure = EINVAL;
    for (curr = list; curr != NULL; curr = curr->ai_next) {
        listenfd = socket(curr->ai_family, curr->ai_socktype, curr->ai_protocol);
        if (listenfd == -1)
            continue;

        if (bind(listenfd, curr->ai_addr, curr->ai_addrlen) == -1) {
            if (!failure)
                failure = errno;
            close(listenfd);
            listenfd = -1;
            continue;
        }

        /* Bind successfully */
        break;
    }

    /* Discard the chain, as we don't need it anymore.
     * Note: curr is no longer valid after this. */
    freeaddrinfo(list);

    /* Failed to bind? */
    if (listenfd == -1) {
        fprintf(stderr, "Cannot bind to %s port %s: %s.\n", argv[1], argv[2], strerror(failure));
        close(workerfd);
        return 1;
    }

    if (listen(listenfd, LISTEN_BACKLOG) == -1) {
        fprintf(stderr, "Cannot listen for incoming connections to %s port %s: %s.\n", argv[1], argv[2], strerror(errno));
        close(listenfd);
        close(workerfd);
        return 1;
    }

    printf("Now waiting for incoming connections to %s port %s\n", argv[1], argv[2]);
    fflush(stdout);

    while (!done) {

        memset(&conn, 0, sizeof conn);
        connlen = sizeof conn;

        connfd = accept(listenfd, (struct sockaddr *)&conn, &connlen);
        if (connfd == -1) {

            /* Did we just receive a signal? */
            if (errno == EINTR)
                continue;

            /* Report a connection failure. */
            printf("Failed to accept a connection: %s\n", strerror(errno));
            fflush(stdout);

            continue;
        }

        /* Construct the message to the worker process. */
        memset(&connhdr, 0, sizeof connhdr);
        memset(&conniov, 0, sizeof conniov);
        memset(&connbuf, 0, sizeof connbuf);

        conniov.iov_base = conndata;    /* Data payload to send */
        conniov.iov_len  = 1;        /* We send just one (dummy) byte, */
        conndata[0] = 0;        /* a zero. */

        /* Construct the message (header) */
        connhdr.msg_name       = NULL;        /* No optional address */
        connhdr.msg_namelen    = 0;        /* No optional address */
        connhdr.msg_iov        = &conniov;    /* Normal payload - at least one byte */
        connhdr.msg_iovlen     = 1;        /* Only one vector in conniov */
        connhdr.msg_control    = connbuf;    /* Ancillary data */
        connhdr.msg_controllen = sizeof connbuf;

        /* Construct the ancillary data needed to pass one descriptor. */
        connmsg = CMSG_FIRSTHDR(&connhdr);
        connmsg->cmsg_level = SOL_SOCKET;
        connmsg->cmsg_type = SCM_RIGHTS;
        connmsg->cmsg_len = CMSG_LEN(sizeof (int));
        /* Copy the descriptor to the ancillary data. */
        memcpy(CMSG_DATA(connmsg), &connfd, sizeof (int));

        /* Update the message to reflect the ancillary data length */
        connhdr.msg_controllen = connmsg->cmsg_len;

        do {
            written = sendmsg(workerfd, &connhdr, MSG_NOSIGNAL);
        } while (written == (ssize_t)-1 && errno == EINTR);
        if (written == (ssize_t)-1) {
            const char *const errmsg = strerror(errno);

            /* Lost connection to the other end? */
            if (!done) {
                if (errno == EPIPE)
                    done = SIGPIPE;
                else
                    done = -1;
            }

            printf("Cannot pass connection to worker: %s.\n", errmsg);
            fflush(stdout);

            close(connfd);

            /* Break main loop. */
            break;
        }

        /* Since the descriptor has been transferred to the other process,
         * we can close our end. */
        do {
            result = close(connfd);
        } while (result == -1 && errno == EINTR);
        if (result == -1)
            printf("Error closing leftover connection descriptor: %s.\n", strerror(errno));

        printf("Connection transferred to the worker process.\n");
        fflush(stdout);
    }

    /* Shutdown. */

    close(listenfd);
    close(workerfd);

    switch (done) {
    case SIGTERM:
        printf("Terminated.\n");
        break;

    case SIGPIPE:
        printf("Lost connection.\n");
        break;

    case SIGHUP:
        printf("Hanging up.\n");
        break;

    case SIGINT:
        printf("Interrupted; exiting.\n");
        break;

    default:
        printf("Exiting.\n");
    }

    return 0;
}

and worker.c:

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netdb.h>
#include <signal.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>

/* How many concurrent pending connections are allowed */
#define  LISTEN_BACKLOG     32

/* Unix domain socket path length (including NUL byte) */
#ifndef  UNIX_PATH_LEN
#define  UNIX_PATH_LEN    108
#endif

/* Flag to indicate we have received a shutdown request. */
volatile sig_atomic_t     done = 0;

/* Shutdown request signal handler, of the basic type. */
void handle_done_signal(int signum)
{
    if (!done)
        done = signum;

    return;
}

/* Install shutdown request signal handler on signal signum. */
int set_done_signal(const int signum)
{
    struct sigaction act;

    sigemptyset(&act.sa_mask);
    act.sa_handler = handle_done_signal;
    act.sa_flags = 0;

    if (sigaction(signum, &act, NULL) == -1)
        return errno;
    else
        return 0;
}

/* Helper function to duplicate file descriptors.
 * Returns 0 if success, errno error code otherwise.
*/
static int copy_fd(const int fromfd, const int tofd)
{
    int result;

    if (fromfd == tofd)
        return 0;

    if (fromfd == -1 || tofd == -1)
        return errno = EINVAL;

    do {
        result = dup2(fromfd, tofd);
    } while (result == -1 && errno == EINTR);
    if (result == -1)
        return errno;

    return 0;
}

int main(int argc, char *argv[])
{
    struct sockaddr_un     worker;
    int             workerfd, workerpathlen;
    int             serverfd, clientfd;

    pid_t             child;

    struct msghdr         msghdr;
    struct iovec         msgiov;
    struct cmsghdr        *cmsg;
    char             data[1];
    char             ancillary[CMSG_SPACE(sizeof (int))];
    ssize_t             received;

    if (argc < 3) {
        fprintf(stderr, "\n");
        fprintf(stderr, "Usage: %s WORKER COMMAND [ ARGS .. ]\n", argv[0]);
        fprintf(stderr, "This creates a worker that receives connections\n");
        fprintf(stderr, "from Unix domain socket WORKER.\n");
        fprintf(stderr, "Each connection is served by COMMAND, with the\n");
        fprintf(stderr, "connection connected to its standard input and output.\n");
        fprintf(stderr, "\n");
        return (argc == 1) ? 0 : 1;
    }

    /* Handle HUP, INT, PIPE, and TERM signals,
     * so when the user presses Ctrl-C, the worker process cannot be contacted,
     * or the user sends a HUP or TERM signal, this server closes down cleanly. */
    if (set_done_signal(SIGINT) ||
        set_done_signal(SIGHUP) ||
        set_done_signal(SIGPIPE) ||
        set_done_signal(SIGTERM)) {
        fprintf(stderr, "Error: Cannot install signal handlers.\n");
        return 1;
    }

    /* Unix domain socket */
    memset(&worker, 0, sizeof worker);
    worker.sun_family = AF_UNIX;

    workerpathlen = strlen(argv[1]);
    if (workerpathlen < 1) {
        fprintf(stderr, "Worker Unix domain socket path cannot be empty.\n");
        return 1;
    } else
    if (workerpathlen >= UNIX_PATH_LEN) {
        fprintf(stderr, "%s: Worker Unix domain socket path is too long.\n", argv[1]);
        return 1;
    }

    memcpy(&worker.sun_path, argv[1], workerpathlen);
    /* Note: the terminating NUL byte was set by memset(&worker, 0, sizeof worker) above. */

    workerfd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (workerfd == -1) {
        fprintf(stderr, "Cannot create an Unix domain socket: %s.\n", strerror(errno));
        return 1;
    }
    if (bind(workerfd, (const struct sockaddr *)(&worker), (socklen_t)sizeof worker) == -1) {
        fprintf(stderr, "%s: %s.\n", argv[1], strerror(errno));
        close(workerfd);
        return 1;
    }
    if (listen(workerfd, LISTEN_BACKLOG) == -1) {
        fprintf(stderr, "%s: Cannot listen for messages: %s.\n", argv[1], strerror(errno));
        close(workerfd);
        return 1;
    }

    printf("Listening for descriptors on %s.\n", argv[1]);
    fflush(stdout);

    while (!done) {

        serverfd = accept(workerfd, NULL, NULL);
        if (serverfd == -1) {

            if (errno == EINTR)
                continue;

            printf("Failed to accept a connection from the server: %s.\n", strerror(errno));
            fflush(stdout);
            continue;
        }

        printf("Connection from the server.\n");
        fflush(stdout);

        while (!done && serverfd != -1) {

            memset(&msghdr, 0, sizeof msghdr);
            memset(&msgiov, 0, sizeof msgiov);

            msghdr.msg_name       = NULL;
            msghdr.msg_namelen    = 0;
            msghdr.msg_control    = &ancillary;
            msghdr.msg_controllen = sizeof ancillary;

            cmsg = CMSG_FIRSTHDR(&msghdr);
            cmsg->cmsg_level = SOL_SOCKET;
            cmsg->cmsg_type = SCM_RIGHTS;
            cmsg->cmsg_len = CMSG_LEN(sizeof (int));

            msghdr.msg_iov    = &msgiov;
            msghdr.msg_iovlen = 1;

            msgiov.iov_base    = &data;
            msgiov.iov_len = 1; /* Just one byte */

            received = recvmsg(serverfd, &msghdr, 0);

            if (received == (ssize_t)-1) {
                if (errno == EINTR)
                    continue;

                printf("Error receiving a message from server: %s.\n", strerror(errno));
                fflush(stdout);
                break;
            }

            cmsg = CMSG_FIRSTHDR(&msghdr);
            if (!cmsg || cmsg->cmsg_len != CMSG_LEN(sizeof (int))) {
                printf("Received a bad message from server.\n");
                fflush(stdout);
                break;
            }

            memcpy(&clientfd, CMSG_DATA(cmsg), sizeof (int));

            printf("Executing command with descriptor %d: ", clientfd);
            fflush(stdout);

            child = fork();
            if (child == (pid_t)-1) {
                printf("Fork failed: %s.\n", strerror(errno));
                fflush(stdout);
                close(clientfd);
                break;
            }

            if (!child) {
                /* This is the child process. */

                close(workerfd);
                close(serverfd);

                if (copy_fd(clientfd, STDIN_FILENO) ||
                    copy_fd(clientfd, STDOUT_FILENO) ||
                    copy_fd(clientfd, STDERR_FILENO))
                    return 126; /* Exits the client */

                if (clientfd != STDIN_FILENO &&
                    clientfd != STDOUT_FILENO &&
                    clientfd != STDERR_FILENO)
                    close(clientfd);

                execvp(argv[2], argv + 2);

                return 127; /* Exits the client */
            }

            printf("Done.\n");
            fflush(stdout);

            close(clientfd);
        }

        close(serverfd);

        printf("Closed connection to server.\n");
        fflush(stdout);        
    }

    /* Shutdown. */
    close(workerfd);

    switch (done) {
    case SIGTERM:
        printf("Terminated.\n");
        break;

    case SIGPIPE:
        printf("Lost connection.\n");
        break;

    case SIGHUP:
        printf("Hanging up.\n");
        break;

    case SIGINT:
        printf("Interrupted; exiting.\n");
        break;

    default:
        printf("Exiting.\n");
    }

    return 0;
}

You can compile them using

gcc -W -Wall -O3 worker.c -o worker
gcc -W -Wall -O3 server.c -o server

and run using e.g.

rm -f connection
./worker connection  /bin/date &
./server 127.0.0.1 8000 connection &

As you can see, the ./worker and ./server processes are completely separate. I recommend starting them from different windows (leaving out the & at the end of the command lines, which otherwise runs the commands at the background). The connection is the path or name of the Unix domain socket used to transfer the network connection file descriptor. The /bin/date is a command (not a shell command, an executable) that will be executed for each connection, with standard input, output and error connected directly to the network client -- very much like inetd or xinetd does, just bare bones.

You can test the connection via e.g.

nc 127.0.0.1 8000

or

telnet 127.0.0.1 8000

The above /bin/date command will just output the current date to standard output, but if you use a bit cleverer worker command, say

rm -f connection
./worker connection printf 'HTTP/1.0 200 Ok\r\nConnection: close\r\nContent-Type: text/html; charset=UTF-8\r\nContent-Length: 67\r\n\r\n<html><head><title>Works</title></head><body>Works!</body></html>\r\n'

you can use your browser (http://127.0.0.1:8000/) to test.

The design is such that worker.c listens to an Unix domain socket (connection in current working directory in all above example commands). It first accepts a connection (from a single server), then expects each incoming byte to be associated with SCM_RIGHTS ancillary data containing the file descriptor referring to the client connection. If there is a problem, or the connection is dropped, it goes back to waiting for a new connection from a server. If it receives a client descriptor, it forks a child process, redirects its standard input, output and error to the client descriptor, and executes the command specified on the ./worker command line. The parent process closes its copy of the client descriptor, and goes back to waiting for a new one.

server.c listens for incoming connections to the IPv4 or IPv6 address and port specified on its command line. When it gets a connection, it transfers the connected file descriptor to above worker.c process via the Unix domain socket specified on the command line (connection), closes its own copy, and goes back to waiting for a new connection. Note that if the server loses the connection to the worker, it aborts; you'll want to start ./worker always before the ./server.

Both server.c and worker.c install simple signal handlers so that you can tell them to exit by sending them a HUP or INT signal (Ctrl-C, if you run the commands in the foreground in separate terminals or shells). They also have reasonable error checking, so when they exit, they tell you exactly why. To be honest, I did it because that way you WILL receive EINTR errors occasionally, and unless you treat them correctly (retrying the relevant syscalls unless asked to exit), your processes will be fragile, and crash from the slightest changes in conditions. Be robust; it's not that hard, and the results are much more user/sysadmin-friendly.

I hope you find the code interesting. I'd be happy to elaborate, if you have any questions on the details. Just remember that I wrote it from scratch in very little time, and it is only intended as a simple example. There is a lot of room for improvement.

like image 120
Nominal Animal Avatar answered Nov 09 '22 06:11

Nominal Animal


UNIX socket is used to pass file descriptors between processes.

like image 26
rpbear Avatar answered Nov 09 '22 07:11

rpbear