Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rust TCP socket server only working with one connection

Tags:

rust

I'm new to Rust and I'm trying to configure a simple tcp socket server which will listen to connections and will reply with the same message that received.

The thing is, this works as I want except when connecting with multiple clients.. The first client that connects will send and receive the messages but if a second client connects, the first one keeps working but the second never receives messages, in fact the message never gets in the code that will handle it. And if I disconnect the first socket, the server will start spamming forever that received a message from the first socket with the same content than the last message it sent.

I am pretty sure I did something wrong in my code but I can't find it

This is my server struct:

use std::collections::HashMap;
use std::io::Read;
use std::io::Write;
use std::net::Shutdown;
use std::net::TcpListener;
use std::net::TcpStream;
use std::str;
use std::sync::{Arc, RwLock};
use threadpool::ThreadPool;

#[derive(Clone, Debug)]
pub struct Server {
    id: Arc<RwLock<u32>>,
    connections: Arc<RwLock<HashMap<u32, TcpStream>>>,
    url: String,
    thread_pool: ThreadPool
}

impl Server {
    pub fn new(url: String) -> Server {
        let server = Server {
            id: Arc::new(RwLock::new(0)),
            connections: Arc::new(RwLock::new(HashMap::new())),
            url,
            thread_pool: ThreadPool::new(10)
        };

        server
    }

    pub fn start(&self) {
        let listener = TcpListener::bind(&self.url).expect("Could not start the server");

        println!("Server started succesfully");

        for stream in listener.incoming() {
            match stream {
                Ok(stream) => {
                    let mut self_clone = self.clone();

                    self.thread_pool.execute(move || {
                        self_clone.on_client_connect(stream.try_clone().unwrap());
                    });
                }
                Err(error) => eprintln!("Error when tried to use stream. Error = {:?}", error),
            }
        }
    }

    fn on_client_connect(&mut self, stream: TcpStream) {
        println!("Client connected from {}", stream.local_addr().unwrap());

        let mut id = self.id.write().unwrap();
        {
            *id += 1;
        }

        self.connections
            .write()
            .unwrap()
            .insert(*id, stream.try_clone().unwrap());

        let mut stream = stream.try_clone().unwrap();

        let mut buffer = [0; 1024];

        while match stream.read(&mut buffer) {
            Ok(size) => {
                println!(
                    "Message received from {} - {}",
                    id,
                    str::from_utf8(&buffer).unwrap()
                );

                stream.write_all(&buffer[0..size]).unwrap();

                true
            }
            Err(error) => {
                println!(
                    "Error when reading message from socket. Error = {:?}",
                    error
                );
                stream.shutdown(Shutdown::Both).unwrap();

                false
            }
        } { }
    }
}

And in my main.rs I'm just calling the connect function and the server starts working

like image 706
Gustavo Henrique Morales de Me Avatar asked Dec 09 '25 08:12

Gustavo Henrique Morales de Me


1 Answers

In this piece of code in your on_client_connect function, you're aquiring a read lock for self.id:

let mut id = self.id.write().unwrap();
{
    *id += 1;
}

However, the id variable, which holds the lock, is not released until it drops at the end of the function. This means that all other clients will wait for this lock to be released, which won't happen until the function currently holding the lock has completed (which happens when that client disconnects).

You can solve this by rewriting the above code to only keep the lock while incrementing, and then storing the ID value in a variable:

let id: u32 = {
    let mut id_lock = self.id.write.unwrap();
    *id_lock += 1;
    *id_lock

    // id_lock is dropped at the end of this block, so the lock is released
};

Even better, you can use AtomicU32, which is still thread-safe yet does not require locking at all:

use std::sync::atomic::{AtomicU32, Ordering};

struct {
    id: Arc<AtomicU32>,
    // ...
}
// Fetch previous value, then increment `self.id` by one, in a thread-safe and lock-free manner
let id: u32 = self.id.fetch_add(1, Ordering::Relaxed);

Also, when the connection is closed your code goes into an infinite loop because you're not handling the case where stream.read() returns Ok(0), which indicates that the connection was closed:

while match stream.read(&mut buffer) {
    Ok(0) => false, // handle connection closed...
    Ok(size) => { /* ... */ }
    Err(err) => { /* ... */ }
} {}
like image 62
Frxstrem Avatar answered Dec 12 '25 09:12

Frxstrem



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!