Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to correctly exit the thread blocking on mpsc::Receiver

Tags:

rust

channel

impl A {
    fn new() -> (A, std::sync::mpsc::Receiver<Data>) {
        let (sender, receiver) = std::sync::mpsc::channel();

        let objA = A { sender: sender, }; // A spawns threads, clones and uses sender etc
        (objA, receiver)
    }
}

impl B {
    fn new() -> B {
        let (objA, receiver) = A::new();

        B {
            a: objA,
            join_handle: Some(std::thread::spwan(move || {
                loop {
                    match receiver.recv() {
                        Ok(data) => /* Do Something, inform main thread etc */,
                        Err(_) => break,
                    }
                }
            })),
        }
    }
}

impl Drop for B {
    fn drop(&mut self) {
        // Want to do something like "sender.close()/receiver.close()" etc so that the following
        // thread joins. But there is no such function. How do i break the following thread ?
        self.join_handle().take().unwrap().join().unwrap();
    }
}

Is there a way to cleanly exit under such a circumstance ? The thing is that when either receiver or sender is dropped the other sniffs this and gives an error. In case of receiver it will be woken up and will yield an error in which case i am breaking out of the infinite and blocking loop above. However how do i do that explicitly using this very property of channels, without resorting to other flags in conjunction with try_recv()etc., and cleanly exit my thread deterministically?

like image 975
ustulation Avatar asked Jul 31 '15 12:07

ustulation


2 Answers

Why not sending a specific message to shut this thread? I do not know what is your data but most of the time it may be an enum and adding a enum variant like 'MyData::Shutdown' in your receive you can simply break out of the loop.

like image 60
cheme Avatar answered Nov 13 '22 17:11

cheme


You can wrap the a field of your B type in an Option. This way in the Drop::drop method you can do drop(self.a.take()) which will replace the field with a None and drop the sender. This closes the channel and your thread can now be properly joined.

like image 2
oli_obk Avatar answered Nov 13 '22 17:11

oli_obk