I have a Rust async server based on the Tokio runtime. It has to process a mix of latency-sensitive I/O-bound requests, and heavy CPU-bound requests.
I don't want to let the CPU-bound tasks monopolize the Tokio runtime and starve the I/O bound tasks, so I'd like to offload the CPU-bound tasks to a dedicated, isolated threadpool (isolation is the key here, so spawn_blocking
/block_in_place
on one shared threadpool are insufficient). How can I create such a threadpool in Tokio?
A naive approach of starting two runtimes runs into an error:
thread 'tokio-runtime-worker' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like
block_on
) attempted to block the current thread while the thread is being used to drive asynchronous tasks.'
use tokio; // 0.2.20
fn main() {
let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();
let cpu_pool = cpu_pool.handle().clone(); // this is the fix/workaround!
main_runtime.block_on(main_runtime.spawn(async move {
cpu_pool.spawn(async {}).await
}))
.unwrap().unwrap();
}
Can Tokio allow two separate runtimes? Is there a better way to create an isolated CPU pool in Tokio?
This initial release of the runtime includes the reactor as well as a work-stealing based thread pool for scheduling and executing the application's code. This provides a multi-threaded default for applications. The work-stealing default is ideal for most applications.
The #[tokio::main] function is a macro. It transforms the async fn main() into a synchronous fn main() that initializes a runtime instance and executes the async main function. The details of the Tokio runtime will be covered later.
It uses a thread pool that is optimized for use cases that involve multiplexing large number of independent tasks that perform short (ish) amounts of computation and are mainly waiting on I/O, i.e. the Tokio use case. Usually, users of ThreadPool will not create pool instances.
So, if you want to create a threadpool to make heavy use of CPU, a good way is to use a crate like Rayon and send the result back to the Tokio task. Show activity on this post.
At the core, ThreadPool uses a work-stealing based scheduling strategy. When spawning a task while external to the thread pool (i.e., from a thread that is not part of the thread pool), the task is randomly assigned to a worker thread. When spawning a task while internal to the thread pool, the task is assigned to the current worker.
This is where the Thread Pool comes to the rescue. The thread pool reuses previously created threads to execute current tasks and offers a solution to the problem of thread cycle overhead and resource thrashing. In this post, I want to talk about how to set an optimal thread pool size.
While Tokio already has a threadpool, the documentation of Tokio advises:
If your code is CPU-bound and you wish to limit the number of threads used to run it, you should run it on another thread pool such as rayon. You can use an oneshot channel to send the result back to Tokio when the rayon task finishes.
So, if you want to create a threadpool to make heavy use of CPU, a good way is to use a crate like Rayon and send the result back to the Tokio task.
Tokio's error message was misleading. The problem was due to Runtime
object being dropped in an async context.
The workaround is to use Handle
, not Runtime
directly, for spawning tasks on the other runtime.
fn main() {
let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();
// this is the fix/workaround:
let cpu_pool = cpu_pool.handle().clone();
main_runtime.block_on(main_runtime.spawn(async move {
cpu_pool.spawn(async {}).await
}))
.unwrap().unwrap();
}
Starting a Tokio runtime already creates a threadpool. The relevant options are
Builder::core_threads
(default in 0.2.20 is the number of CPU cores)Builder::max_threads
(default in 0.2.20 is 512)Roughly speaking, core_threads
controls how many threads will be used to process asynchronous code. max_threads
- core_threads
is how many threads will be used for blocking work (emphasis mine):
Otherwise as
core_threads
are always active, it limits additional threads (e.g. for blocking annotations) asmax_threads
-core_threads
.
You can also specify these options through the tokio::main
attribute.
You can then annotate blocking code with either of:
task::spawn_blocking
task::block_in_place
See also:
spawn_blocking
can easily take all of the threads available in the one and only runtime, forcing other futures to wait on them
You can make use of techniques like a Semaphore
to restrict maximum parallelism in this case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With