I read the tokio documentation and I wonder what is the best approach for encapsulating costly synchronous I/O in a future.
With the reactor framework, we get the advantage of a green threading model: a few OS threads handle a lot of concurrent tasks through an executor.
The future model of tokio is demand driven, which means the future itself will poll its internal state to provide informations about its completion; allowing backpressure and cancellation capabilities. As far as I understand, the polling phase of the future must be non-blocking to work well.
The I/O I want to encapsulate can be seen as a long atomic and costly operation. Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.
The two only options I see are:
poll
function of the future.As I understand it, neither solution is optimal and don't get the full advantage of the green-threading model (first is not advised in documentation and second don't pass through the executor provided by reactor framework). Is there another solution?
Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.
Yes, this is the recommended approach for asynchronous execution. Note that this is not restricted to I/O, but is valid for any long-running synchronous task!
The ThreadPool
type was created for this1.
In this case, you spawn work to run in the pool. The pool itself performs the work to check to see if the work is completed yet and returns a type that fulfills the Future
trait.
use futures::{
executor::{self, ThreadPool},
future,
task::{SpawnError, SpawnExt},
}; // 0.3.1, features = ["thread-pool"]
use std::{thread, time::Duration};
async fn delay_for(pool: &ThreadPool, seconds: u64) -> Result<u64, SpawnError> {
pool.spawn_with_handle(async {
thread::sleep(Duration::from_secs(3));
3
})?
.await;
Ok(seconds)
}
fn main() -> Result<(), SpawnError> {
let pool = ThreadPool::new().expect("Unable to create threadpool");
let a = delay_for(&pool, 3);
let b = delay_for(&pool, 1);
let c = executor::block_on(async {
let (a, b) = future::join(a, b).await;
Ok(a? + b?)
});
println!("{}", c?);
Ok(())
}
You can see that the total time is only 3 seconds:
% time ./target/debug/example
4
real 3.010
user 0.002
sys 0.003
1 — There's some discussion that the current implementation may not be the best for blocking operations, but it suffices for now.
Here, we use task::spawn_blocking
use futures::future; // 0.3.15
use std::{thread, time::Duration};
use tokio::task; // 1.7.1, features = ["full"]
async fn delay_for(seconds: u64) -> Result<u64, task::JoinError> {
task::spawn_blocking(move || {
thread::sleep(Duration::from_secs(seconds));
seconds
})
.await?;
Ok(seconds)
}
#[tokio::main]
async fn main() -> Result<(), task::JoinError> {
let a = delay_for(3);
let b = delay_for(1);
let (a, b) = future::join(a, b).await;
let c = a? + b?;
println!("{}", c);
Ok(())
}
See also CPU-bound tasks and blocking code in the Tokio documentation.
Note that this is not an efficient way of sleeping, it's just a placeholder for some blocking operation. If you actually need to sleep, use something like futures-timer or tokio::time::sleep
. See Why does Future::select choose the future with a longer sleep period first? for more details
neither solution is optimal and don't get the full advantage of the green-threading model
That's correct - because you don't have something that is asynchronous! You are trying to combine two different methodologies and there has to be an ugly bit somewhere to translate between them.
second don't pass through the executor provided by reactor framework
I'm not sure what you mean here. There's an executor implicitly created by block_on
or tokio::main
. The thread pool has some internal logic that checks to see if a thread is done, but that should only be triggered when the user's executor poll
s it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With