I'm using Tokio 1.1 to do async things. I have an async
main
with #[tokio::main]
so I'm already operating with a runtime.
main
invokes a non-async method where I'd like to be await
on a future (specifically, I'm collecting from a datafusion dataframe). This non-async method has a signature prescribed by a trait which returns a struct, not a Future<Struct>
. As far as I'm aware, I can't mark it async.
If I try and call df.collect().await;
, I get the
only allowed inside
async
functions and blocks
error from the compiler, pointing out that the method that I'm calling await
within is not async
.
If I try and block_on
the future from a new runtime like this:
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(df.collect());
I get a runtime panic:
Cannot start a runtime from within a runtime. This happens because a function (like
block_on
) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
If I try futures::executor::block_on(df.collect()).unwrap();
, I get a new runtime panic:
'not currently running on a Tokio 0.2.x runtime.'
which is weird because I'm using Tokio v1.1.
This feels harder than it should. I'm within an async context, and it feels like the compiler should know that and allow me to call .await
from within the method - the only code path invokes this method from within an async
block. Is there a simple way to do this that I'm missing?
I'm within an async context, and it feels like the compiler should know that and allow me to call .await from within the method
It is fundamentally impossible to await
inside of a synchronous function whether or not you are within the context of a runtime. await
's are transformed into yield points, and async
functions are transformed into state machine's that make use of these yield points to perform asynchronous computations. Without marking your function as async
, this transformation is impossible.
If I understand your question correctly, you have the following code:
#[tokio::main]
async fn main() {
let foo = Foo {};
foo.bar()
}
impl Trait for Foo {
fn bar(df: DataFrame) -> Vec<Data> {
df.collect().await
}
}
The issue is that you cannot await df.collect
from within bar
, because it is not marked as async
. If you can modify the signature of Trait
, then you can make Trait::bar
an async method with the workarounds mentioned in How can I define an async method in a trait?.
If you cannot change the signature of Trait
, then you have a problem. Async functions should never spend a long time without reaching a .await
. As explained in What is the best approach to encapsulate blocking I/O in future-rs?, you can to use spawn_blocking
when transitioning into non-async code:
#[tokio::main]
async fn main() {
let foo = Foo {};
tokio::task::spawn_blocking(move || foo.bar()).await
}
impl Trait for Foo {
fn bar(df: DataFrame) -> Vec<Data> {
df.collect().await
}
}
Now you need a way to run df.collect
to completion, without awaiting. You mentioned that you tried to create a nested runtime to solve this problem:
If I try and block_on the future from a new runtime ... I get a panic
However, tokio does not allow you create nested runtimes. You could create a new, independent runtime, as explained in How can I create a Tokio runtime inside another Tokio runtime. However, spawning a nested runtime would be inefficient.
Instead of spawning a new runtime, you can get a handle to the current runtime:
let handle = Handle::current();
Enter the runtime context:
handle.enter();
And then run the future to completion with futures::executor::block_on
:
impl Trait for Foo {
fn bar(df: DataFrame) -> Vec<Data> {
let handle = Handle::current();
handle.enter();
futures::executor::block_on(df.collect())
}
}
Entering the tokio runtime context will solve the error you were getting previously:
If I try futures::executor::block_on(df.collect()).unwrap();, I get a new runtime panic
not currently running on a Tokio 0.2.x runtime
I would urge you to try and avoid doing this if you can. The optimal solution would be to mark Trait::bar
as async
and await
as normal. Any other solutions, including the ones mentioned above, involve blocking the current thread until the given future completes.
Credit @AliceRyhl for the explanation
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With