Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I create a Tokio runtime inside another Tokio runtime without getting the error "Cannot start a runtime from within a runtime"?

I'm using rust_bert for summarising text. I need to set a model with rust_bert::pipelines::summarization::SummarizationModel::new, which fetches the model from the internet. It does this asynchronously using tokio and the issue that (I think) I'm running into is that I am running the Tokio runtime within another Tokio runtime, as indicated by the error message:

Downloading https://cdn.huggingface.co/facebook/bart-large-cnn/config.json to "/home/(censored)/.cache/.rustbert/bart-cnn/config.json"
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /home/(censored)/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/runtime/enter.rs:38:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

I've tried running the model fetching synchronously with tokio::task::spawn_blocking and tokio::task::block_in_place but neither of them are working for me. block_in_place gives the same error as if weren't there, and spawn_blocking doesn't really seem to be of use to me. I've also tried making summarize_text async, but that didn't help much. Github Issue tokio-rs/tokio#2194 and Reddit post "'Cannot start a runtime from within a runtime.' with Actix-Web And Postgresql" seem similar (same-ish error message), but they weren't of much help in finding a solution.

The code I've got issues with is as follows:

use egg_mode::tweet;
use rust_bert::pipelines::summarization::SummarizationModel;

fn summarize_text(model: SummarizationModel, text: &str) -> String {
    let output = model.summarize(&[text]);
    // @TODO: output summarization
    match output.is_empty() {
        false => "FALSE".to_string(),
        true => "TRUE".to_string(),
    }
}

#[tokio::main]
async fn main() {
    let model = SummarizationModel::new(Default::default()).unwrap();

    let token = egg_mode::auth::Token::Bearer("obviously not my token".to_string());
    let tweet_id = 1221552460768202756; // example tweet

    println!("Loading tweet [{id}]", id = tweet_id);
    let status = tweet::show(tweet_id, &token).await;
    match status {
        Err(err) => println!("Failed to fetch tweet: {}", err),
        Ok(tweet) => {
            println!(
                "Original tweet:\n{orig}\n\nSummarized tweet:\n{sum}",
                orig = tweet.text,
                sum = summarize_text(model, &tweet.text)
            );
        }
    }
}
like image 413
Mib Avatar asked Jun 23 '20 14:06

Mib


2 Answers

Solving the problem

This is a reduced example:

use tokio; // 1.0.2

#[tokio::main]
async fn inner_example() {}

#[tokio::main]
async fn main() {
    inner_example();
}
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.0.2/src/runtime/enter.rs:39:9

To avoid this, you need to run the code that creates the second Tokio runtime on a completely independent thread. The easiest way to do this is to use std::thread::spawn:

use std::thread;

#[tokio::main]
async fn inner_example() {}

#[tokio::main]
async fn main() {
    thread::spawn(|| {
        inner_example();
    }).join().expect("Thread panicked")
}

For improved performance, you may wish to use a threadpool instead of creating a new thread each time. Conveniently, Tokio itself provides such a threadpool via spawn_blocking:

#[tokio::main]
async fn inner_example() {}

#[tokio::main]
async fn main() {
    tokio::task::spawn_blocking(|| {
        inner_example();
    }).await.expect("Task panicked")
}

In some cases you don't need to actually create a second Tokio runtime and can instead reuse the parent runtime. To do so, you pass in a Handle to the outer runtime. You can optionally use a lightweight executor like futures::executor to block on the result, if you need to wait for the work to finish:

use tokio::runtime::Handle; // 1.0.2

fn inner_example(handle: Handle) {
    futures::executor::block_on(async {
        handle
            .spawn(async {
                // Do work here
            })
            .await
            .expect("Task spawned in Tokio executor panicked")
    })
}

#[tokio::main]
async fn main() {
    let handle = Handle::current();

    tokio::task::spawn_blocking(|| {
        inner_example(handle);
    })
    .await
    .expect("Blocking task panicked")
}

See also:

  • Do a synchronous http client fetch within an actix thread
  • How to create a dedicated threadpool for CPU-intensive work in Tokio?

Avoiding the problem

A better path is to avoid creating nested Tokio runtimes in the first place. Ideally, if a library uses an asynchronous executor, it would also offer the direct asynchronous function so you could use your own executor.

It's worth looking at the API to see if there is a non-blocking alternative, and if not, raising an issue on the project's repository.

You may also be able to reorganize your code so that the Tokio runtimes are not nested but are instead sequential:

struct Data;

#[tokio::main]
async fn inner_example() -> Data {
    Data
}

#[tokio::main]
async fn core(_: Data) {}

fn main() {
    let data = inner_example();
    core(data);
}
like image 134
Shepmaster Avatar answered Oct 31 '22 08:10

Shepmaster


I had a similar issue using the QA Model while loading it into warp (a tokio runtime), sequential runtimes still weren't working for me, but I found my solution in the github issues of rust-bert. Solution was simply to wrap the initial loading call in task::spawn_blocking. This is fine for me because I can't accept any requests before it's loaded anyway. A snippet below in case it helps others.

   78 fn with_model(
   79     qa: QaModel, // alias for Arc<Mutex<QuestionAnsweringModel>>
   80 ) -> impl Filter<Extract = (QaModel,), Error = std::convert::Infallible>       + Clone {
   81     warp::any().map(move || qa.clone())
   82 }
   83
   84 #[tokio::main]
   85 async fn main() {
   86     env_logger::init();
   87 
   88     // NOTE: have to download the model before booting up
>> 89     let qa_model: QaModel = task::spawn_blocking(move || {
   90         log::debug!("setting up qa model config");
   91         let c = qa_model_config();
   92         log::debug!("finished setting up qa model config");
   93 
   94         log::debug!("setting up qa model");
   95         let m = qa_model(c);
   96         log::debug!("finished setting up qa model");
   97         m
   98     })
   99     .await
  100     .expect("got model");
  101 
  102     let ask_handler = warp::path!("ask")
  103         .and(warp::get())
  104         .and(warp::query::<QaQuery>())
  105         .and(with_model(qa_model))
  106         .and_then(ask);
  107 
  108     warp::serve(ask_handler).run(([127, 0, 0, 1], 3030)).await;
  109 }
like image 28
Alex Moore-Niemi Avatar answered Oct 31 '22 08:10

Alex Moore-Niemi