When a request comes in, I have to process data in a highly parallelizable way. The processing is a mix of some CPU-intensive work (1ms bursts), and potentially needing a blocking web request (library out of my control). The function is exposed to me as a sync function, but uses block_in_place() under the hood when blocking.
Say I have the following example:
async fn handle_request(data: &Bytes) {
let mut results = vec![];
for i in 0..1000 {
let processed = do_heavy_calc_that_might_block(data, i); <-- this possibly needs to do a blocking web request, uses block_in_place when it does
let result = do_something_with_res(processed).await;
results.push(result);
}
results
}
I'd love to to use either tokio::task::spawn() or spawn_blocking(), but both have problems in testing. If I use spawn with something like:
async fn handle_request(data: &Bytes) {
let mut tasks = vec![];
for i in 0..1000 {
tasks.push(tokio::task::spawn(async move {
let processed = do_heavy_calc_that_might_block(data, i);
let result = do_something_with_res(processed).await;
result
}));
}
let results = futures_util::future::join_all(tasks).await;
results
}
When do_heavy_calc runs, if too many of the chunks need to block, they block_in_place(), and the runtime gets starved, with nothing ever completing.
If I switch to spawn_blocking instead, I can't pass it an async function, so the do_something_with_res can't .await. Obviously I can return the processed data out of the task and do the do_something_with_res after the loop, but the actual code in the task goes back and forth between block/async/block/async multiple times. So was looking for something more general.
I've tried using Rayon instead of within tokio, but I end up not parallelizing as far as I'd like. Since the blocking calls can take time, they can clog up the thread pool as well, and since they're mixed in with async calls, would be nice to take advantage of the tokio runtime I'm already working within...
The regular Tokio worker thread pool, and Rayon's thread pool, are both designed around the assumption that each task they run is either busy using the CPU, or yielding. If you put blocking-on-network tasks on either of them, you're going to get poor performance. You need to use something other than a thread pool that has been sized for CPU parallelism.
spawn_blocking() is a good way to do that. In order to interact with async code from within the blocking task, you can call Handle::block_on(). (If you use a non-Tokio thread pool, e.g. rayon::spawn(), then be sure to call Handle::current() to get the handle from Tokio before you spawn, otherwise there will be no Tokio runtime context available.)
Note that the number of max_blocking_threads also affects how your spawn_blocking tasks execute. The default is 512, so you're unlikely to hit it unless you've got a lot of work to do, but if it does come up, you might want to impose an explicit limit in your application logic, because tasks waiting to run still take up memory, network servers may not appreciate too many incoming connections, and you've only got so many CPU cores anyway — so at that point you may find that overall throughput is improved by intentionally queueing things within your application logic rather than throwing everything you've got at Tokio.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With