Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the standard way to get a Rust thread out of blocking operations?

Coming from Java, I am used to idioms along the lines of

while (true) {
  try {
    someBlockingOperation();
  } catch (InterruptedException e) {
    Thread.currentThread.interrupt(); // re-set the interrupted flag
    cleanup(); // whatever is necessary
    break;
  }
}

This works, as far as I know, across the whole JDK for anything that might block, like reading from files, from sockets, from a queue and even for Thread.sleep().

Reading on how this is done in Rust, I find lots of seemingly special solutions mentioned like mio, tokio. I also find ErrorKind::Interrupted and tried to get this ErrorKind with sending SIGINT to the thread, but the thread seems to die immediately without leaving any (back)trace.

Here is the code I used (note: not very well versed in Rust yet, so it might look a bit strange, but it runs):

use std::io;
use std::io::Read;
use std::thread;

pub fn main() {
    let sub_thread = thread::spawn(|| {
        let mut buffer = [0; 10];
        loop {
            let d = io::stdin().read(&mut buffer);
            println!("{:?}", d);
            let n = d.unwrap();
            if n == 0 {
                break;
            }
            println!("-> {:?}", &buffer[0..n]);
        }
    });

    sub_thread.join().unwrap();
}

By "blocking operations", I mean:

  • sleep
  • socket IO
  • file IO
  • queue IO (not sure yet where the queues are in Rust)

What would be the respective means to signal to a thread, like Thread.interrupt() in Java, that its time to pack up and go home?

like image 532
Harald Avatar asked Sep 23 '18 11:09

Harald


1 Answers

There is no such thing. Blocking means blocking.

Instead, you deliberately use tools that are non-blocking. That's where libraries like mio, Tokio, or futures come in — they handle the architecture of sticking all of these non-blocking, asynchronous pieces together.

catch (InterruptedException e)

Rust doesn't have exceptions. If you expect to handle a failure case, that's better represented with a Result.

Thread.interrupt()

This doesn't actually do anything beyond setting a flag in the thread that some code may check and then throw an exception for. You could build the same structure yourself. One simple implementation:

use std::{
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    thread,
    time::Duration,
};

fn main() {
    let please_stop = Arc::new(AtomicBool::new(false));

    let t = thread::spawn({
        let should_i_stop = please_stop.clone();
        move || {
            while !should_i_stop.load(Ordering::SeqCst) {
                thread::sleep(Duration::from_millis(100));
                println!("Sleeping");
            }
        }
    });

    thread::sleep(Duration::from_secs(1));
    please_stop.store(true, Ordering::SeqCst);
    t.join().unwrap();
}

Sleep

No way of interrupting, as far as I know. The documentation even says:

On Unix platforms this function will not return early due to a signal

Socket IO

You put the socket into nonblocking mode using methods like set_nonblocking and then handle ErrorKind::WouldBlock.

See also:

  • Tokio
  • async-std

File IO

There isn't really a good cross-platform way of performing asynchronous file IO. Most implementations spin up a thread pool and perform blocking operations there, sending the data over something that does non-blocking.

See also:

  • Tokio
  • async-std

Queue IO

Perhaps you mean something like a MPSC channel, in which case you'd use tools like try_recv.

See also:

  • How to terminate or suspend a Rust thread from another thread?
  • What is the best approach to encapsulate blocking I/O in future-rs?
  • What does java.lang.Thread.interrupt() do?
like image 134
Shepmaster Avatar answered Sep 30 '22 02:09

Shepmaster