Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run an asynchronous task from a non-main thread in Tokio?

use std::thread;
use tokio::task; // 0.3.4

#[tokio::main]
async fn main() {
    thread::spawn(|| {
        task::spawn(async {
            println!("123");
        });
    })
    .join();
}

When compiling I get a warning:

warning: unused `std::result::Result` that must be used
  --> src/main.rs:6:5
   |
6  | /     thread::spawn(|| {
7  | |         task::spawn(async {
8  | |             println!("123");
9  | |         });
10 | |     })
11 | |     .join();
   | |____________^
   |
   = note: `#[warn(unused_must_use)]` on by default
   = note: this `Result` may be an `Err` variant, which should be handled

And when executing I get an error:

thread '<unnamed>' panicked at 'must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`', src/main.rs:7:9
like image 960
ibse Avatar asked Apr 18 '20 16:04

ibse


People also ask

Does Tokio use green threads?

A task is similar to an OS thread, but rather than being managed by the OS scheduler, they are managed by the Tokio runtime. Another name for this general pattern is green threads.

How many tasks can Tokio handle?

Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory. Applications should feel free to spawn thousands, if not millions of tasks.

What does Tokio spawn do?

Function tokio::spawnSpawns a new asynchronous task, returning a JoinHandle for it. Spawning a task enables the task to execute concurrently to other tasks. The spawned task may execute on the current thread, or it may be sent to a different thread to be executed.

What is Tokio :: Main?

Attribute Macro tokio::mainThis macro helps set up a Runtime without requiring the user to use Runtime or Builder directly. Note: This macro is designed to be simplistic and targets applications that do not require a complex setup.


2 Answers

The key piece is that you need to get a Tokio Handle. This is a reference to a Runtime and it allows you to spawn asynchronous tasks from outside of the runtime.

When using #[tokio::main], the simplest way to get a Handle is via Handle::current before spawning another thread then give the handle to each thread that might want to start an asynchronous task:

use std::thread;
use tokio::runtime::Handle; // 0.3.4

#[tokio::main]
async fn main() {
    let threads: Vec<_> = (0..3)
        .map(|thread_id| {
            let handle = Handle::current();

            thread::spawn(move || {
                eprintln!("Thread {} started", thread_id);

                for task_id in 0..3 {
                    handle.spawn(async move {
                        eprintln!("Thread {} / Task {}", thread_id, task_id);
                    });
                }

                eprintln!("Thread {} finished", thread_id);
            })
        })
        .collect();

    for t in threads {
        t.join().expect("Thread panicked");
    }
}

You could also create a global, mutable singleton of a Mutex<Option<Handle>>, initialize it to None, then set it to Some early in your tokio::main function. Then, you can grab that global variable, unwrap it, and clone the Handle when you need it:

use once_cell::sync::Lazy; // 1.5.2

static HANDLE: Lazy<Mutex<Option<Handle>>> = Lazy::new(Default::default);
*HANDLE.lock().unwrap() = Some(Handle::current());
let handle = HANDLE.lock().unwrap().as_ref().unwrap().clone();

See also:

  • How do I add tasks to a Tokio event loop that is running on another thread?
  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?
  • How to create a dedicated threadpool for CPU-intensive work in Tokio?
  • How do I create a global, mutable singleton?
like image 93
Shepmaster Avatar answered Oct 07 '22 15:10

Shepmaster


I have a job processing app that exposes a web API to add jobs and process them but the API request should not wait for the job to finish (it could take a while). I use Server-Sent Events to broadcast the job result. This means the main API server is executing inside main with #[tokio::main], but where should I be running the job executor? In the job executor, I will have plenty of waiting: things like downloading. They will interfere with the web API server. The crucial question is how do I even start both executions in parallel?

In this scenario, you need to create a separate thread with thread::spawn inside which you will create a Tokio executor. The error you get is that inside your second thread, there is no Tokio executor (runtime). You need to create one manually and tell it to run your tasks. The easier way is to use the Runtime API:

use tokio::runtime::Runtime; // 0.2.23

// Create the runtime
let rt = Runtime::new().unwrap();

// Spawn a future onto the runtime
rt.spawn(async {
    println!("now running on a worker thread");
});

In your main thread, an executor is already available with the use of #[tokio::main]. Prior to the addition of this attribute, the runtime was created manually.

If you want to stick with the async/await philosophy, you can use join:

use tokio; // 0.2.23

#[tokio::main]
async fn main() {
    let (_, _) = tokio::join!(start_server_listener(), start_job_processor());
}

This is why most answers are questioning your approach. Although very rare, I believe there are scenarios where you want an async runtime to be on another thread while also having the benefit to manually configure the runtime.

like image 33
evanxg852000 Avatar answered Oct 07 '22 14:10

evanxg852000