I have a test that opens and listens to a Unix Domain Socket. The socket is opened and reads data without issues, but it doesn't shutdown gracefully.
This is the error I get when I try to run the test a second time:
thread 'test_1' panicked at 'called
Result::unwrap()
on anErr
value: Error { repr: Os { code: 48, message: "Address already in use" } }', ../src/libcore/result.rs:799 note: Run withRUST_BACKTRACE=1
for a backtrace.
The code is available at the Rust playground and there's a Github Gist for it.
use std::io::prelude::*;
use std::thread;
use std::net::Shutdown;
use std::os::unix::net::{UnixStream, UnixListener};
Test Case:
#[test]
fn test_1() {
driver();
assert_eq!("1", "2");
}
Main entry point function
fn driver() {
let listener = UnixListener::bind("/tmp/my_socket.sock").unwrap();
thread::spawn(|| socket_server(listener));
// send a message
busy_work(3);
// try to disconnect the socket
let drop_stream = UnixStream::connect("/tmp/my_socket.sock").unwrap();
let _ = drop_stream.shutdown(Shutdown::Both);
}
Function to send data in intervals
#[allow(unused_variables)]
fn busy_work(threads: i32) {
// Make a vector to hold the children which are spawned.
let mut children = vec![];
for i in 0..threads {
// Spin up another thread
children.push(thread::spawn(|| socket_client()));
}
for child in children {
// Wait for the thread to finish. Returns a result.
let _ = child.join();
}
}
fn socket_client() {
let mut stream = UnixStream::connect("/tmp/my_socket.sock").unwrap();
stream.write_all(b"hello world").unwrap();
}
Function to handle data
fn handle_client(mut stream: UnixStream) {
let mut response = String::new();
stream.read_to_string(&mut response).unwrap();
println!("got response: {:?}", response);
}
Server socket that listens to incoming messages
#[allow(unused_variables)]
fn socket_server(listener: UnixListener) {
// accept connections and process them, spawning a new thread for each one
for stream in listener.incoming() {
match stream {
Ok(mut stream) => {
/* connection succeeded */
let mut response = String::new();
stream.read_to_string(&mut response).unwrap();
if response.is_empty() {
break;
} else {
thread::spawn(|| handle_client(stream));
}
}
Err(err) => {
/* connection failed */
break;
}
}
}
println!("Breaking out of socket_server()");
drop(listener);
}
When a socket is no longer of use, the process can discard it by calling close(S): close(s); If data is associated with a socket which promises reliable delivery (a stream socket), the system continues to attempt to transfer the data.
To create a UNIX domain socket, use the socket function and specify AF_UNIX as the domain for the socket. The z/TPF system supports a maximum number of 16,383 active UNIX domain sockets at any time. After a UNIX domain socket is created, you must bind the socket to a unique file path by using the bind function.
Please learn to create a minimal reproducible example and then take the time to do so. In this case, there's no need for threads or functions or testing frameworks; running this entire program twice reproduces the error:
use std::os::unix::net::UnixListener;
fn main() {
UnixListener::bind("/tmp/my_socket.sock").unwrap();
}
If you look at the filesystem before and after the test, you will see that the file /tmp/my_socket.sock
is not present before the first run and it is present before the second run. Deleting the file allows the program to run to completion again (at which point it recreates the file).
This issue is not unique to Rust:
Note that, once created, this socket file will continue to exist, even after the server exits. If the server subsequently restarts, the file prevents re-binding:
[...]
So, servers should unlink the socket pathname prior to binding it.
You could choose to add some wrapper around the socket that would automatically delete it when it is dropped or create a temporary directory that is cleaned when it is dropped, but I'm not sure how well that would work. You could also create a wrapper function that deletes the file before it opens the socket.
use std::path::{Path, PathBuf};
struct DeleteOnDrop {
path: PathBuf,
listener: UnixListener,
}
impl DeleteOnDrop {
fn bind(path: impl AsRef<Path>) -> std::io::Result<Self> {
let path = path.as_ref().to_owned();
UnixListener::bind(&path).map(|listener| DeleteOnDrop { path, listener })
}
}
impl Drop for DeleteOnDrop {
fn drop(&mut self) {
// There's no way to return a useful error here
let _ = std::fs::remove_file(&self.path).unwrap();
}
}
You may also want to consider implementing Deref
/ DerefMut
to make this into a smart pointer for sockets:
impl std::ops::Deref for DeleteOnDrop {
type Target = UnixListener;
fn deref(&self) -> &Self::Target {
&self.listener
}
}
impl std::ops::DerefMut for DeleteOnDrop {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.listener
}
}
This is much simpler:
use std::path::Path;
fn bind(path: impl AsRef<Path>) -> std::io::Result<UnixListener> {
let path = path.as_ref();
std::fs::remove_file(path)?;
UnixListener::bind(path)
}
Note that you can combine the two solutions, such that the socket is deleted before creation and when it's dropped.
I think that deleting during creation is a less-optimal solution: if you ever start a second server, you'll prevent the first server from receiving any more connections. It's probably better to error and tell the user instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With