I have a Tokio client that talks to a remote server and is supposed to keep the connection alive permanently. I've implemented the initial authentication handshake and found that when my test terminates, I get an odd panic:
---- test_connect_without_database stdout ----
thread 'test_connect_without_database' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.3.5/src/runtime/blocking/shutdown.rs:51:21
I'm at an absolute loss as to what might be causing this so I don't know what other bits of code to add for context.
Here's my minimal reproducible example (playground):
use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr};
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio::runtime;
#[derive(PartialEq, Debug)]
pub struct Configuration {
/// Database username.
username: String,
/// Database password.
password: String,
/// Database name.
db_name: String,
/// IP address for the remove server.
address: IpAddr,
/// Remote server port.
port: u16,
/// Number of connections to open.
connections: u16,
}
impl Configuration {
pub fn new(
username: &str,
password: &str,
db_name: &str,
address: &str,
port: u16,
connections: u16,
) -> Result<Configuration, Box<dyn std::error::Error>> {
let address = address.to_string().parse()?;
let configuration = Configuration {
username: username.to_string(),
password: password.to_string(),
db_name: db_name.to_string(),
address,
port,
connections,
};
Ok(configuration)
}
pub fn socket(&self) -> SocketAddr {
SocketAddr::new(self.address, self.port)
}
}
#[derive(Debug)]
pub struct Session {
configuration: Configuration,
runtime: RefCell<runtime::Runtime>,
}
impl Session {
fn new(config: Configuration) -> Result<Session, io::Error> {
let runtime = runtime::Builder::new_multi_thread()
.worker_threads(6)
.enable_all()
.build()?;
let session = Session {
configuration: config,
runtime: RefCell::new(runtime),
};
Ok(session)
}
fn configuration(&self) -> &Configuration {
&self.configuration
}
}
#[derive(Debug)]
pub(crate) struct Connection<'a> {
/// Socket uses to read and write from.
session: &'a Session,
/// Connection to the remote server.
stream: TcpStream,
/// Flag that indicates whether the connection is live.
live: bool,
}
impl<'a> Connection<'a> {
async fn new(session: &Session) -> Result<Connection<'_>, Box<dyn std::error::Error>> {
let mut stream = TcpStream::connect(session.configuration().socket()).await?;
let conn = Connection {
session,
stream,
live: true,
};
Ok(conn)
}
fn live(&self) -> bool {
self.live
}
}
#[tokio::test]
async fn test_connect_without_database() -> Result<(), Box<dyn std::error::Error>> {
let config = Configuration::new("rust", "", "", "127.0.0.1", 2345, 2).unwrap();
let session = Session::new(config).unwrap();
let conn = Connection::new(&session).await?;
assert!(conn.live());
Ok(())
}
fn main() {
println!("{}", 65u8 as char)
}
The Tokio runtime. The runtime provides an I/O driver, task scheduler, timer, and blocking pool, necessary for running asynchronous tasks. Instances of Runtime can be created using new, or Builder .
It is also possible to create an extra Tokio runtime dedicated to CPU-bound tasks, but if you do this, you should be careful that the extra runtime runs only CPU-bound tasks, as IO-bound tasks on that runtime will behave poorly. Hint: If using rayon, you can use a oneshot channel to send the result back to Tokio when the rayon task finishes.
thread 'tokio-runtime-worker' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.'
Usually, dropping a Runtime handle is sufficient as tasks are able to shutdown in a timely fashion. However, dropping a Runtime will wait indefinitely for all tasks to terminate, and there are cases where a long blocking task has been spawned, which can block dropping Runtime.
As the error message states:
This happens when a runtime is dropped from within an asynchronous context
You have created nested runtimes:
tokio::test
runtime::Builder::new_multi_thread
The second runtime is owned by Session
which is dropped at the end of the asynchronous test. You can observe this by skipping the destructor using mem::forget
:
#[tokio::test]
async fn test_connect_without_database() -> Result<(), Box<dyn std::error::Error>> {
let config = Configuration::new("rust", "", "", "127.0.0.1", 2345, 2).unwrap();
let session = Session::new(config).unwrap();
// Note that the assert was removed!
std::mem::forget(session);
Ok(())
}
Don't spawn nested runtimes and don't drop one runtime from within another.
See also:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With