Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the best approach to encapsulate blocking I/O in future-rs?

I read the tokio documentation and I wonder what is the best approach for encapsulating costly synchronous I/O in a future.

With the reactor framework, we get the advantage of a green threading model: a few OS threads handle a lot of concurrent tasks through an executor.

The future model of tokio is demand driven, which means the future itself will poll its internal state to provide informations about its completion; allowing backpressure and cancellation capabilities. As far as I understand, the polling phase of the future must be non-blocking to work well.

The I/O I want to encapsulate can be seen as a long atomic and costly operation. Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.

The two only options I see are:

  • Include the blocking I/O in the poll function of the future.
  • spawn an OS thread to perform the I/O and use the future mechanism to poll its state, as shown in the documentation

As I understand it, neither solution is optimal and don't get the full advantage of the green-threading model (first is not advised in documentation and second don't pass through the executor provided by reactor framework). Is there another solution?

like image 440
Momh Avatar asked Jan 30 '17 09:01

Momh


1 Answers

Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.

Yes, this is the recommended approach for asynchronous execution. Note that this is not restricted to I/O, but is valid for any long-running synchronous task!

Futures crate

The ThreadPool type was created for this1.

In this case, you spawn work to run in the pool. The pool itself performs the work to check to see if the work is completed yet and returns a type that fulfills the Future trait.

use futures::{
    executor::{self, ThreadPool},
    future,
    task::{SpawnError, SpawnExt},
}; // 0.3.1, features = ["thread-pool"]
use std::{thread, time::Duration};

async fn delay_for(pool: &ThreadPool, seconds: u64) -> Result<u64, SpawnError> {
    pool.spawn_with_handle(async {
        thread::sleep(Duration::from_secs(3));
        3
    })?
    .await;
    Ok(seconds)
}

fn main() -> Result<(), SpawnError> {
    let pool = ThreadPool::new().expect("Unable to create threadpool");

    let a = delay_for(&pool, 3);
    let b = delay_for(&pool, 1);

    let c = executor::block_on(async {
        let (a, b) = future::join(a, b).await;

        Ok(a? + b?)
    });

    println!("{}", c?);
    Ok(())
}

You can see that the total time is only 3 seconds:

% time ./target/debug/example
4

real    3.010
user    0.002
sys     0.003

1 — There's some discussion that the current implementation may not be the best for blocking operations, but it suffices for now.

Tokio

Here, we use task::spawn_blocking

use futures::future; // 0.3.15
use std::{thread, time::Duration};
use tokio::task; // 1.7.1, features = ["full"]

async fn delay_for(seconds: u64) -> Result<u64, task::JoinError> {
    task::spawn_blocking(move || {
        thread::sleep(Duration::from_secs(seconds));
        seconds
    })
    .await?;
    Ok(seconds)
}

#[tokio::main]
async fn main() -> Result<(), task::JoinError> {
    let a = delay_for(3);
    let b = delay_for(1);

    let (a, b) = future::join(a, b).await;
    let c = a? + b?;

    println!("{}", c);

    Ok(())
}

See also CPU-bound tasks and blocking code in the Tokio documentation.

Additional points

Note that this is not an efficient way of sleeping, it's just a placeholder for some blocking operation. If you actually need to sleep, use something like futures-timer or tokio::time::sleep. See Why does Future::select choose the future with a longer sleep period first? for more details

neither solution is optimal and don't get the full advantage of the green-threading model

That's correct - because you don't have something that is asynchronous! You are trying to combine two different methodologies and there has to be an ugly bit somewhere to translate between them.

second don't pass through the executor provided by reactor framework

I'm not sure what you mean here. There's an executor implicitly created by block_on or tokio::main. The thread pool has some internal logic that checks to see if a thread is done, but that should only be triggered when the user's executor polls it.

like image 120
Shepmaster Avatar answered Oct 14 '22 22:10

Shepmaster