Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is select() + non-blocking write() possible on a blocking pipe or socket?

The situation is that I have a blocking pipe or socket fd to which I want to write() without blocking, so I do a select() first, but that still doesn't guarantee that write() will not block.

Here is the data I have gathered. Even if select() indicates that writing is possible, writing more than PIPE_BUF bytes can block. However, writing at most PIPE_BUF bytes doesn't seem to block in practice, but it is not mandated by the POSIX spec.

That only specifies atomic behavior. Python(!) documentation states that:

Files reported as ready for writing by select(), poll() or similar interfaces in this module are guaranteed to not block on a write of up to PIPE_BUF bytes. This value is guaranteed by POSIX to be at least 512.

In the following test program, set BUF_BYTES to say 100000 to block in write() on Linux, FreeBSD or Solaris following a successful select. I assume that named pipes have similar behavior to anonymous pipes.

Unfortunately the same can happen with blocking sockets. Call test_socket() in main() and use a largish BUF_BYTES (100000 is good here too). It's unclear whether there is a safe buffer size like PIPE_BUF for sockets.

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <limits.h>
#include <stdio.h>
#include <sys/select.h>
#include <unistd.h>

#define BUF_BYTES PIPE_BUF
char buf[BUF_BYTES];

int
probe_with_select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds)
{
    struct timeval timeout = {0, 0};
    int n_found = select(nfds, readfds, writefds, exceptfds, &timeout);
    if (n_found == -1) {
        perror("select");
    }
    return n_found;
}

void
check_if_readable(int fd)
{
    fd_set fdset;
    FD_ZERO(&fdset);
    FD_SET(fd, &fdset);
    printf("select() for read on fd %d returned %d\n",
           fd, probe_with_select(fd + 1, &fdset, 0, 0));
}

void
check_if_writable(int fd)
{
    fd_set fdset;
    FD_ZERO(&fdset);
    FD_SET(fd, &fdset);
    int n_found = probe_with_select(fd + 1, 0, &fdset, 0);
    printf("select() for write on fd %d returned %d\n", fd, n_found);
    /* if (n_found == 0) { */
    /*     printf("sleeping\n"); */
    /*     sleep(2); */
    /*     int n_found = probe_with_select(fd + 1, 0, &fdset, 0); */
    /*     printf("retried select() for write on fd %d returned %d\n",  */
    /*            fd, n_found); */
    /* } */
}

void
test_pipe(void)
{
    int pipe_fds[2];
    size_t written;
    int i;
    if (pipe(pipe_fds)) {
        perror("pipe failed");
        _exit(1);
    }
    printf("read side pipe fd: %d\n", pipe_fds[0]);
    printf("write side pipe fd: %d\n", pipe_fds[1]);
    for (i = 0; ; i++) {
        printf("i = %d\n", i);
        check_if_readable(pipe_fds[0]);
        check_if_writable(pipe_fds[1]);
        written = write(pipe_fds[1], buf, BUF_BYTES);
        if (written == -1) {
            perror("write");
            _exit(-1);
        }
        printf("written %d bytes\n", written);
    }
}

void
serve()
{
    int listenfd = 0, connfd = 0;
    struct sockaddr_in serv_addr;

    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    memset(&serv_addr, '0', sizeof(serv_addr));

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(5000);

    bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

    listen(listenfd, 10);

    connfd = accept(listenfd, (struct sockaddr*)NULL, NULL);

    sleep(10);
}

int
connect_to_server()
{
    int sockfd = 0, n = 0;
    struct sockaddr_in serv_addr;

    if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket");
        exit(-1);
    }

    memset(&serv_addr, '0', sizeof(serv_addr));

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(5000);

    if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {
        perror("inet_pton");
        exit(-1);
    }

    if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
        perror("connect");
        exit(-1);
    }

    return sockfd;
}

void
test_socket(void)
{
    if (fork() == 0)  {
        serve();
    } else {
        int fd;
        int i;
        int written;
        sleep(1);
        fd = connect_to_server();

        for (i = 0; ; i++) {
            printf("i = %d\n", i);
            check_if_readable(fd);
            check_if_writable(fd);
            written = write(fd, buf, BUF_BYTES);
            if (written == -1) {
                perror("write");
                _exit(-1);
            }
            printf("written %d bytes\n", written);
        }
    }
}

int
main(void)
{
    test_pipe();
    /* test_socket(); */
}
like image 702
melisgl Avatar asked Oct 19 '22 12:10

melisgl


2 Answers

Unless you wish to send one byte at a time whenever select() says the fd is ready for writes, there is really no way to know how much you will be able to send and even then it is theoretically possible (at least in the documentation, if not in the real world) for select to say it's ready for writes and then the condition to change in the time between select() and write().

Non blocking sends are the solution here and you don't need to change your file descriptor to non blocking mode to send one message in non-blocking form if you change from using write() to send(). The only thing you need to change is to add the MSG_DONTWAIT flag to the send call and that will make the one send non-blocking without altering your socket's properties. You don't even need to use select() at all in this case either since the send() call will give you all the information you need in the return code - if you get a return code of -1 and the errno is EAGAIN or EWOULDBLOCK then you know you can't send any more.

like image 71
ckolivas Avatar answered Nov 02 '22 04:11

ckolivas


The Posix section you cite clearly states:

[for pipes] If the O_NONBLOCK flag is clear, a write request may cause the thread to block, but on normal completion it shall return nbyte.

[for streams, which presumably includes streaming sockets] If O_NONBLOCK is clear, and the STREAM cannot accept data (the STREAM write queue is full due to internal flow control conditions), write() shall block until data can be accepted.

The Python documentation you quoted can therefore only apply to non-blocking mode only. But as you're not using Python it has no relevance anyway.

like image 21
user207421 Avatar answered Nov 02 '22 06:11

user207421