Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a dedicated threadpool for CPU-intensive work in Tokio?

I have a Rust async server based on the Tokio runtime. It has to process a mix of latency-sensitive I/O-bound requests, and heavy CPU-bound requests.

I don't want to let the CPU-bound tasks monopolize the Tokio runtime and starve the I/O bound tasks, so I'd like to offload the CPU-bound tasks to a dedicated, isolated threadpool (isolation is the key here, so spawn_blocking/block_in_place on one shared threadpool are insufficient). How can I create such a threadpool in Tokio?

A naive approach of starting two runtimes runs into an error:

thread 'tokio-runtime-worker' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.'

use tokio; // 0.2.20

fn main() {
    let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
    let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();
    let cpu_pool = cpu_pool.handle().clone(); // this is the fix/workaround!

    main_runtime.block_on(main_runtime.spawn(async move {
        cpu_pool.spawn(async {}).await
    }))
    .unwrap().unwrap();
}

Can Tokio allow two separate runtimes? Is there a better way to create an isolated CPU pool in Tokio?

like image 894
Kornel Avatar asked May 12 '20 13:05

Kornel


People also ask

Is Tokio multithreaded?

This initial release of the runtime includes the reactor as well as a work-stealing based thread pool for scheduling and executing the application's code. This provides a multi-threaded default for applications. The work-stealing default is ideal for most applications.

What is Tokio :: Main?

The #[tokio::main] function is a macro. It transforms the async fn main() into a synchronous fn main() that initializes a runtime instance and executes the async main function. The details of the Tokio runtime will be covered later.

What is ThreadPool in Tokio?

It uses a thread pool that is optimized for use cases that involve multiplexing large number of independent tasks that perform short (ish) amounts of computation and are mainly waiting on I/O, i.e. the Tokio use case. Usually, users of ThreadPool will not create pool instances.

How to create a ThreadPool to make heavy use of CPU?

So, if you want to create a threadpool to make heavy use of CPU, a good way is to use a crate like Rayon and send the result back to the Tokio task. Show activity on this post.

What scheduling strategy does ThreadPool use?

At the core, ThreadPool uses a work-stealing based scheduling strategy. When spawning a task while external to the thread pool (i.e., from a thread that is not part of the thread pool), the task is randomly assigned to a worker thread. When spawning a task while internal to the thread pool, the task is assigned to the current worker.

What is a thread pool in Java?

This is where the Thread Pool comes to the rescue. The thread pool reuses previously created threads to execute current tasks and offers a solution to the problem of thread cycle overhead and resource thrashing. In this post, I want to talk about how to set an optimal thread pool size.


Video Answer


3 Answers

While Tokio already has a threadpool, the documentation of Tokio advises:

If your code is CPU-bound and you wish to limit the number of threads used to run it, you should run it on another thread pool such as rayon. You can use an oneshot channel to send the result back to Tokio when the rayon task finishes.

So, if you want to create a threadpool to make heavy use of CPU, a good way is to use a crate like Rayon and send the result back to the Tokio task.

like image 59
Stargateur Avatar answered Oct 20 '22 11:10

Stargateur


Tokio's error message was misleading. The problem was due to Runtime object being dropped in an async context.

The workaround is to use Handle, not Runtime directly, for spawning tasks on the other runtime.

fn main() {
    let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
    let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();

    // this is the fix/workaround:
    let cpu_pool = cpu_pool.handle().clone(); 

    main_runtime.block_on(main_runtime.spawn(async move {
        cpu_pool.spawn(async {}).await
    }))
    .unwrap().unwrap();
}
like image 36
Kornel Avatar answered Oct 20 '22 11:10

Kornel


Starting a Tokio runtime already creates a threadpool. The relevant options are

  • Builder::core_threads (default in 0.2.20 is the number of CPU cores)
  • Builder::max_threads (default in 0.2.20 is 512)

Roughly speaking, core_threads controls how many threads will be used to process asynchronous code. max_threads - core_threads is how many threads will be used for blocking work (emphasis mine):

Otherwise as core_threads are always active, it limits additional threads (e.g. for blocking annotations) as max_threads - core_threads.

You can also specify these options through the tokio::main attribute.

You can then annotate blocking code with either of:

  • task::spawn_blocking
  • task::block_in_place

See also:

  • What is the best approach to encapsulate blocking I/O in future-rs?

spawn_blocking can easily take all of the threads available in the one and only runtime, forcing other futures to wait on them

You can make use of techniques like a Semaphore to restrict maximum parallelism in this case.

like image 37
Shepmaster Avatar answered Oct 20 '22 10:10

Shepmaster