I was wandering if my idea was even implementable: a dynamic library plugin manager with full support for async rust.
The implementation is divided into three main parts (github repo):
my-interface
, an async_trait
(with "normal" traits the approach seems to work fine, don't know to what extent)
use async_trait::async_trait;
#[async_trait]
pub trait SayHelloService {
async fn say_hello(&self);
}
my-master
, loads the .dll
/.so
and calls a creation function from the library that, given a tokio::runtime::Handle
, returns a Box<dyn SayHelloService>
use my_interface::SayHelloService;
use tokio::{self, runtime::Handle};
#[tokio::main]
async fn main() {
let lib = libloading::Library::new("target/debug/libmy_plugin.so").expect("load library");
let new_service: libloading::Symbol<fn(Handle) -> Box<dyn SayHelloService>> =
unsafe { lib.get(b"new_service") }.expect("load symbol");
let service1 = new_service(Handle::current());
let service2 = new_service(Handle::current());
let _ = tokio::join!(service1.say_hello(), service2.say_hello());
}
my-plugin
, implements SayHelloService
and this is the code that is blocking me
use async_trait::async_trait;
use my_interface::SayHelloService;
use tokio::{self, runtime::Handle};
#[no_mangle]
pub fn new_service(handle: Handle) -> Box<dyn SayHelloService> {
Box::new(PluginSayHello::new(handle))
}
pub struct PluginSayHello {
id: String,
handle: Handle,
}
impl PluginSayHello {
fn new(handle: Handle) -> PluginSayHello {
let id = format!("{:08x}", rand::random::<u32>());
println!("[{}] Created instance!", id);
PluginSayHello { id, handle }
}
}
#[async_trait]
impl SayHelloService for PluginSayHello {
// this errors with "future cannot be sent between threads safely"
async fn say_hello(&self) {
// this should enable you to call tokio::sleep but EnterGuard is not Send :(
// https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.enter
let _guard = self.handle.enter();
println!("[{}] Hello from plugin!", self.id);
let _ = tokio::spawn(async move {
let _ = tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("sleep 1");
})
.await;
}
}
Not being able to call self.handle.enter()
creates all sorts of strange behavior for example:
(if you want more info I'll attach the crash log)
#[async_trait]
impl SayHelloService for PluginSayHello {
async fn say_hello(&self) {
let id = self.id.clone();
let _ = self
.handle
.spawn_blocking(move || {
println!("[{}] Hello from plugin!", id);
// internal code of reqwest just crashes
let body = reqwest::get("https://www.rust-lang.org")
.await?
.text()
.await?;
println!("body = {:?}", body);
})
.await;
}
}
I've also implemented a working implementation of PluginSayHello
, but it doesn't feel like a full win to me.
#[async_trait]
impl SayHelloService for PluginSayHello {
async fn say_hello(&self) {
let id = self.id.clone();
let _ = self
.handle
.spawn(async move {
println!("[{}] Hello from plugin!", id);
// calling tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// errors with "there is no reactor running, must be called from the context of a Tokio 1.x runtime"
let _ = sleep(Duration::new(1, 0));
println!("slept 1");
println!("[{}] Hello again from plugin!", id);
})
.await;
}
}
To wrap things up:
This a deceptively non-trivial scenario.
First lets step back and explain why things are the way they are. Asynchronous primitives for I/O and timers work best when they are managed together since there needs to be some thread handling OS events. The Tokio Runtime
offers this of course, but to avoid passing around a handle manually to register timers and I/O calls with the runtime, there is a thread-local "current" variable that stores a handle for the runtime that is executing the current task. So when you do something Tokio-specific like tokio::time::sleep
it will use that thread-local variable to access the runtime's time manager. If there is no "current", you get the "there is no reactor running" error.
This becomes a problem in your plugin system because tokio
will be linked statically in each of your compiled binaries (the master and the plugin) meaning they will have different thread-locals: i.e. what is "current" in the plugin is not the same as what is "current" for the master, even when using the same thread. Thus it appears you have tried to pass the handle manually and used .enter()
but this solution isn't quite right.
One problem you encountered is this causes your async function to be !Send
which can be resolved if you didn't need them to be Send
by using #[async_trait(?Send)]
, but I assume you do. Another problem if the tasks can move between threads, handle.enter()
only sets the current runtime of the current thread and your task may at some point be moved to a different thread.
What you need is for your async
task to have the "current" runtime only while it is executing. You can do this with a Future
wrapper:
use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
use tokio::runtime::Handle;
use pin_project_lite::pin_project;
pin_project! {
struct WithRuntime<F> {
runtime: Handle,
#[pin]
fut: F,
}
}
impl<F> WithRuntime<F> {
fn new(runtime: Handle, fut: F) -> Self {
Self { runtime, fut }
}
}
impl<F> Future for WithRuntime<F>
where
F: Future
{
type Output = F::Output;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let _guard = this.runtime.enter();
this.fut.poll(ctx)
}
}
Which you could then use like WithRuntime::new(handle, async { ... }).await
(full code here). The exact way to integrate it with your current trait may be awkward but I think it needs to change anyways.
Another issue with your current setup is you assume one runtime will always be calling your service. That may not be true; there are configurations like in Actix-Web where there are multiple single-threaded runtimes running. I think your code would still work, but it could be suboptimal. So you should probably pass the "current" handle on each async method invocation. You could probably make a macro that can handle this transparently for plugin writers.
Lastly, you should be warned that Rust's ABI is not stable. What you have is not guaranteed to work between compiler versions (or even with the same compiler?). It will likely, mostly work, but still not a guarantee. Normally you would design a proper FFI (foreign function interface) layer to communicate through, but the Rust's types required (Poll
, Context
, Box<dyn Future>
) are not FFI-safe so you would need a FFI-safe shim (like from stabby which you should totally check out) for it to really work (though still not sure what you'd do for Handle
).
And there's probably other stuff I've missed since I'm not sure why handle.spawn(async { ... })
doesn't work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With