Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Graceful exit TcpListener.incoming()

Tags:

tcp

rust

From the rust std net library:

let listener = TcpListener::bind(("127.0.0.1", port)).unwrap();

info!("Opened socket on localhost port {}", port);

// accept connections and process them serially
for stream in listener.incoming() {
    break;
}

info!("closed socket");

How does one make the listener stop listening? It says in the API that when the listener is dropped, it stops. But how do we drop it if incoming() is a blocking call? Preferably without external crates like tokio/mio.

like image 369
WJM Avatar asked Jun 20 '19 19:06

WJM


4 Answers

You'll want to put the TcpListener into non-blocking mode using the set_nonblocking() method, like so:

use std::io;
use std::net::TcpListener;

let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
listener.set_nonblocking(true).expect("Cannot set non-blocking");

for stream in listener.incoming() {
    match stream {
        Ok(s) => {
            // do something with the TcpStream
            handle_connection(s);
        }
        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
            // Decide if we should exit
            break;
            // Decide if we should try to accept a connection again
            continue;
        }
        Err(e) => panic!("encountered IO error: {}", e),
    }
}

Instead of waiting for a connection, the incoming() call will immediately return a Result<> type. If Result is Ok(), then a connection was made and you can process it. If the Result is Err(WouldBlock), this isn't actually an error, there just wasn't a connection pending at the exact moment incoming() checked the socket.

Note that in the WouldBlock case, you may want to put a sleep() or something before continuing, otherwise your program will rapidly poll the incoming() function checking for a connection, resulting in high CPU usage.

Code example adapted from here

like image 189
effect Avatar answered Oct 02 '22 18:10

effect


The standard library doesn't provide an API for this, but there are a few strategies you can use to work around it:

Shut down reads on the socket

You can use platform-specific APIs to shutdown reads on the socket which will cause the incoming iterator to return an error. You can then break out of handling connections when the error is received. For example, on a Unix system:

use std::net::TcpListener;
use std::os::unix::io::AsRawFd;
use std::thread;

let listener = TcpListener::bind("localhost:0")?;

let fd = listener.as_raw_fd();

let handle = thread::spawn(move || {
  for connection in listener.incoming() {
    match connection {
      Ok(connection) => { /* handle connection */ }
      Err(_) => break,
  }
});

libc::shutdown(fd, libc::SHUT_RD);

handle.join();

Force the listener to wake up

Another (cross-platform) trick is to set a variable indicating that you want to stop listening, and then connect to the socket yourself to force the listening thread to wake up. When the listening thread wakes up, it checks the "stop listening" variable, and then exits cleanly if it's set.

use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;

let listener = TcpListener::bind("localhost:0")?;
let local_addr = listener.local_addr()?;

let shutdown = Arc::new(AtomicBool::new(false));
let server_shutdown = shutdown.clone();
let handle = thread::spawn(move || {
    for connection in listener.incoming() {
        if server_shutdown.load(Ordering::Relaxed) {
            return;
        }

        match connection {
            Ok(connection) => { /* handle connection */ }
            Err(_) => break,
        }
    }
});

shutdown.store(true, Ordering::Relaxed);
let _ = TcpStream::connect(local_addr);

handle.join().unwrap();
like image 26
euclio Avatar answered Oct 02 '22 17:10

euclio


You can poll your socket with an eventfd, which used for signaling. I wrote a helper for this.

let shutdown = EventFd::new();
let listener = TcpListener::bind("0.0.0.0:12345")?;
let incoming = CancellableIncoming::new(&listener, &shutdown);

for stream in incoming {
    // Your logic
}

// While in other thread
shutdown.add(1);  // Light the shutdown signal, now your incoming loop exits gracefully.
use nix;
use nix::poll::{poll, PollFd, PollFlags};
use nix::sys::eventfd::{eventfd, EfdFlags};
use nix::unistd::{close, write};
use std;
use std::net::{TcpListener, TcpStream};
use std::os::unix::io::{AsRawFd, RawFd};

pub struct EventFd {
    fd: RawFd,
}

impl EventFd {
    pub fn new() -> Self {
        EventFd {
            fd: eventfd(0, EfdFlags::empty()).unwrap(),
        }
    }

    pub fn add(&self, v: i64) -> nix::Result<usize> {
        let b = v.to_le_bytes();
        write(self.fd, &b)
    }
}

impl AsRawFd for EventFd {
    fn as_raw_fd(&self) -> RawFd {
        self.fd
    }
}

impl Drop for EventFd {
    fn drop(&mut self) {
        let _ = close(self.fd);
    }
}

// -----
//
pub struct CancellableIncoming<'a> {
    listener: &'a TcpListener,
    eventfd: &'a EventFd,
}

impl<'a> CancellableIncoming<'a> {
    pub fn new(listener: &'a TcpListener, eventfd: &'a EventFd) -> Self {
        Self { listener, eventfd }
    }
}

impl<'a> Iterator for CancellableIncoming<'a> {
    type Item = std::io::Result<TcpStream>;
    fn next(&mut self) -> Option<std::io::Result<TcpStream>> {
        use nix::errno::Errno;

        let fd = self.listener.as_raw_fd();
        let evfd = self.eventfd.as_raw_fd();
        let mut poll_fds = vec![
            PollFd::new(fd, PollFlags::POLLIN),
            PollFd::new(evfd, PollFlags::POLLIN),
        ];

        loop {
            match poll(&mut poll_fds, -1) {
                Ok(_) => break,
                Err(nix::Error::Sys(Errno::EINTR)) => continue,
                _ => panic!("Error polling"),
            }
        }

        if poll_fds[0].revents().unwrap() == PollFlags::POLLIN {
            Some(self.listener.accept().map(|p| p.0))
        } else if poll_fds[1].revents().unwrap() == PollFlags::POLLIN {
            None
        } else {
            panic!("Can't be!");
        }
    }
}
like image 27
Proton Avatar answered Oct 02 '22 18:10

Proton


Note that setting

listener.set_nonblocking(true).expect("Cannot set non-blocking");

as recommended in @effect's answer automatically sets nonblocking to true on all of the streams as well. Maybe you want this and maybe you don't; I had to set them back to false with

stream.set_nonblocking(false);
like image 20
Maksim Avatar answered Oct 02 '22 18:10

Maksim