Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lifetime troubles sharing references between threads

Tags:

rust

lifetime

I've got a thread that launches worker threads, all are expected to live forever. Each worker thread maintains it's own list of Sockets.

Some operations require that I traverse all sockets currently alive, but I'm having trouble with lifetimes trying to create a master list of sockets containing a pointer to a socket owned by another list.

use std::{str, thread};
use std::thread::JoinHandle;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::ops::DerefMut;
use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError};
use self::socketlist::SocketList;
use self::mastersocketlist::MasterSocketList;

pub struct Socket {
    user: String,
    stream: TcpStream,
}

mod socketlist {
    use self::SocketList::{Node, End};
    use super::Socket;

    pub enum SocketList {
        Node(Socket, Box<SocketList>),
        End,
    }

    impl SocketList {
        pub fn new() -> SocketList {
            End
        }

        pub fn add(self, socket: Socket) -> SocketList {
            Node(socket, Box::new(self))
        }

        pub fn newest<'a>(&'a mut self) -> Result<&'a Socket, String> {
            match *self {
                Node(ref mut socket, ref mut next) => Ok(socket),
                End => Err("No socket available".to_string()),
            }
        }
    }
}

mod mastersocketlist {
    use self::MasterSocketList::{Node, End};
    use super::Socket;

    pub enum MasterSocketList<'a> {
        Node(Box<&'a Socket>, Box<MasterSocketList<'a>>),
        End,
    }

    impl<'a> MasterSocketList<'a> {
        pub fn new() -> MasterSocketList<'a> {
            End
        }

        pub fn add(self, socket: &'a Socket) -> MasterSocketList<'a> {
            MasterSocketList::Node(Box::new(&socket), Box::new(self))
        }
    }
}

pub struct SlotManager {
    prox: JoinHandle<()>,
    prox_tx: Sender<TcpStream>,
}

impl SlotManager {
    pub fn new() -> SlotManager {
        let (tx, rx): (Sender<TcpStream>, Receiver<TcpStream>) = channel();

        let tx_clone = tx.clone();
        let prox = thread::spawn(move || SlotManager::event_loop(tx, rx));

        SlotManager {
            prox: prox,
            prox_tx: tx_clone,
        }
    }

    pub fn sender(&self) -> Sender<TcpStream> {
        self.prox_tx.clone()
    }

    fn event_loop(tx: Sender<TcpStream>, rx: Receiver<TcpStream>) {
        let socket_list = Arc::new(Mutex::new(MasterSocketList::new()));
        let mut slot = Slot::new(socket_list.clone());
        loop {
            match rx.try_recv() {
                Ok(stream) => slot.new_connection(stream),
                Err(e) => {}
            }
        }
    }
}

pub struct Slot {
    prox: JoinHandle<()>,
    prox_tx: Sender<TcpStream>,
}

impl Slot {
    pub fn new(master_socket_list: Arc<Mutex<MasterSocketList>>) -> Slot {
        let (tx, rx): (Sender<TcpStream>, Receiver<TcpStream>) = channel();

        let tx_clone = tx.clone();
        let prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list));

        Slot {
            prox: prox,
            prox_tx: tx_clone,
        }
    }

    pub fn new_connection(&self, stream: TcpStream) {
        self.prox_tx.send(stream);
    }

    fn event_loop(tx: Sender<TcpStream>,
                  rx: Receiver<TcpStream>,
                  master_socket_list: Arc<Mutex<MasterSocketList>>) {

        let mut sockets = SocketList::new();
        loop {
            // Check for new connections
            match rx.try_recv() {
                Ok(stream) => {
                    let mut socket = Socket {
                        user: "default".to_string(),
                        stream: stream,
                    };
                    sockets = sockets.add(socket);

                    let mut msl_guard = match master_socket_list.lock() {
                        Ok(guard) => guard,
                        Err(poisoned) => poisoned.into_inner(),
                    };
                    let mut msl_handle = msl_guard.deref_mut();
                    *msl_handle = msl_handle.add(sockets.newest().unwrap());
                }
                Err(e) => {}
            }
        }
    }
}

fn main() {
    let mut slot_manager = SlotManager::new();
    let listener = TcpListener::bind("127.0.0.1:1234").unwrap();
    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                let sender = slot_manager.sender();
                thread::spawn(move || {
                    sender.send(stream);
                    //process_new_connection(stream, sender)
                });
            }
            Err(e) => println!("Connection error: {}", e),
        }
    }
    drop(listener);
}

The errors that I receive...

error[E0477]: the type `[closure@src/main.rs:107:34: 107:86 tx:std::sync::mpsc::Sender<std::net::TcpStream>, rx:std::sync::mpsc::Receiver<std::net::TcpStream>, master_socket_list:std::sync::Arc<std::sync::Mutex<mastersocketlist::MasterSocketList<'_>>>]` does not fulfill the required lifetime
   --> src/main.rs:107:20
    |
107 |         let prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list));
    |                    ^^^^^^^^^^^^^
    |
    = note: type must outlive the static lifetime

I don't even know if what I am trying to is possible as safe code.

I wanting the mastersocketlist to contain a pointer to a socket where the socket's lifetime is defined by the thread that created it. I believe that's what all those errors mean, but I have no idea how to provide the proper lifetime annotations to fix it.

like image 308
nathansizemore Avatar asked Feb 22 '15 05:02

nathansizemore


1 Answers

A great thing about Rust is that the type-checking across functions is done solely by the function signature. That means you can replace most of the bodies of functions with unimplemented!() and preserve type-checking errors.

Repeat that process a few times, and you end up not calling a lot of functions - remove those. Inlining modules and reducing structs / enums can also help.

At some point your error will disappear - the first clue towards the problem! Keep at it, and you get a tiny reproduction:

use std::sync::{Arc, Mutex};
use std::thread;

pub enum MasterSocketList<'a> {
    One(&'a u8),
}

pub struct Slot;

impl Slot {
    pub fn new<'a>(master_socket_list: Arc<Mutex<MasterSocketList<'a>>>) -> Slot {
        thread::spawn(move || {
            master_socket_list;
        });
        unimplemented!();
    }
}

fn main() {}

Checking out the error, it still matches:

error[E0477]: the type `[closure@src/main.rs:12:23: 14:10 master_socket_list:std::sync::Arc<std::sync::Mutex<MasterSocketList<'a>>>]` does not fulfill the required lifetime
  --> src/main.rs:12:9
   |
12 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^
   |
   = note: type must satisfy the static lifetime

Let's check the docs for the signature of thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static, 

The key point here is F: Send + 'static - the closure you give to spawn must only contain references that last the entire life of the program. This is because spawn can create threads that become detached. Once detached, the thread could live forever, so all references must live at least that long, otherwise you'd get dangling references, a bad thing! Rust saves the day, once again!

If you want to guarantee that the threads will terminate at a known point, you can use scoped threads, such as those provided by scoped-threadpool or crossbeam.

If your code didn't have a variable with a lifetime inside of it, using some type of shared ownership like Arc paired with something that will ensure only one thread can mutate the variable, like Mutex would have been sufficient. This allows each thread to own the shared value, finally dropping it whenever the last thread exits. See How do I share a mutable object between threads? for details.

like image 174
Shepmaster Avatar answered Sep 27 '22 16:09

Shepmaster