Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I await a future inside a non-async method which was called from an async method within the context of a Tokio runtime?

I'm using Tokio 1.1 to do async things. I have an async main with #[tokio::main] so I'm already operating with a runtime.

main invokes a non-async method where I'd like to be await on a future (specifically, I'm collecting from a datafusion dataframe). This non-async method has a signature prescribed by a trait which returns a struct, not a Future<Struct>. As far as I'm aware, I can't mark it async.

If I try and call df.collect().await;, I get the

only allowed inside async functions and blocks

error from the compiler, pointing out that the method that I'm calling await within is not async.

If I try and block_on the future from a new runtime like this:

tokio::runtime::Builder::new_current_thread()
    .build()
    .unwrap()
    .block_on(df.collect());

I get a runtime panic:

Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

If I try futures::executor::block_on(df.collect()).unwrap();, I get a new runtime panic:

'not currently running on a Tokio 0.2.x runtime.'

which is weird because I'm using Tokio v1.1.

This feels harder than it should. I'm within an async context, and it feels like the compiler should know that and allow me to call .await from within the method - the only code path invokes this method from within an async block. Is there a simple way to do this that I'm missing?

like image 245
growse Avatar asked Feb 03 '21 20:02

growse


1 Answers

I'm within an async context, and it feels like the compiler should know that and allow me to call .await from within the method

It is fundamentally impossible to await inside of a synchronous function whether or not you are within the context of a runtime. await's are transformed into yield points, and async functions are transformed into state machine's that make use of these yield points to perform asynchronous computations. Without marking your function as async, this transformation is impossible.

If I understand your question correctly, you have the following code:

#[tokio::main]
async fn main() {
    let foo = Foo {};
    foo.bar()
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

The issue is that you cannot await df.collect from within bar, because it is not marked as async. If you can modify the signature of Trait, then you can make Trait::bar an async method with the workarounds mentioned in How can I define an async method in a trait?.

If you cannot change the signature of Trait, then you have a problem. Async functions should never spend a long time without reaching a .await. As explained in What is the best approach to encapsulate blocking I/O in future-rs?, you can to use spawn_blocking when transitioning into non-async code:

#[tokio::main]
async fn main() {
    let foo = Foo {};
    tokio::task::spawn_blocking(move || foo.bar()).await
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

Now you need a way to run df.collect to completion, without awaiting. You mentioned that you tried to create a nested runtime to solve this problem:

If I try and block_on the future from a new runtime ... I get a panic

However, tokio does not allow you create nested runtimes. You could create a new, independent runtime, as explained in How can I create a Tokio runtime inside another Tokio runtime. However, spawning a nested runtime would be inefficient.

Instead of spawning a new runtime, you can get a handle to the current runtime:

let handle = Handle::current();

Enter the runtime context:

handle.enter();

And then run the future to completion with futures::executor::block_on:

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        let handle = Handle::current();
        handle.enter();
        futures::executor::block_on(df.collect())
    }
}

Entering the tokio runtime context will solve the error you were getting previously:

If I try futures::executor::block_on(df.collect()).unwrap();, I get a new runtime panic not currently running on a Tokio 0.2.x runtime

I would urge you to try and avoid doing this if you can. The optimal solution would be to mark Trait::bar as async and await as normal. Any other solutions, including the ones mentioned above, involve blocking the current thread until the given future completes.

Credit @AliceRyhl for the explanation

like image 94
Ibraheem Ahmed Avatar answered Sep 26 '22 00:09

Ibraheem Ahmed