Background:
I am having trouble integrating sqlx
with juniper
subscriptions.
I am getting a Pin<Box<dyn Stream<Item = Result<User, sqlx::Error>> + 'e + Send>>
from sqlx::query::QueryAs::fetch()
.
juniper
needs subscriptions to be returned as Pin<Box<dyn Stream<Item = Result<User, juniper::FieldError>> + Send>>
.
Note the change from Result<User, sqlx::Error>
to Result<User, juniper::FieldError>
. Using map_err()
from futures::TryStreamExt
, I created the following code to perform the query and transform the error type.
type UsersStream =
Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;
#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot {
async fn users(context: &Context) -> UsersStream {
let sqlx::query_as!(User, "SELECT * FROM users")
.fetch(&context.pool)
.map_err(|e| {
FieldError::new(
"Database error",
graphql_value!(format!("{}", e)))
})
.boxed()
}
}
This fails with the following error on compile:
error[E0759]: `executor` has lifetime `'ref_e` but it needs to satisfy a `'static` lifetime requirement
--> server/src/graphql/subscription.rs:27:1
|
27 | #[juniper::graphql_subscription(Context = Context)]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| |
| this data with lifetime `'ref_e`...
| ...is captured here...
...
63 | / sqlx::query_as!(User, "SELECT * FROM users")
64 | | .fetch(&context.pool)
65 | | .map_err(|e| {
66 | | FieldError::new(
... |
69 | | })
70 | | .boxed()
| |____________________- ...and is required to live as long as `'static` here
|
= note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)
error: aborting due to previous error
I am not familiar enough with Stream
s or lifetimes to understand the implications of this error. After looking into this some more, it seems that ref_e
is the lifetime of the subscription's reference to juniper
's Executor
.
Attempts:
juniper::Context
as discussed in graphql-rust/juniper#143.Versions:
sqlx-0.4.1
juniper
pinned to commit cd66bdb
on master
your code is not exactly like mine but I think the solution can apply here too, try to clone the pool before using it:
type UsersStream =
Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;
#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot {
async fn users(context: &Context) -> UsersStream {
let pool = context.pool.clone();
let sqlx::query_as!(User, "SELECT * FROM users")
.fetch(&pool)
.map_err(|e| {
FieldError::new(
"Database error",
graphql_value!(format!("{}", e)))
})
.boxed()
}
}
The answer by Mathieu is correct. So I'll explain the reason behind the error.
The fundamental issue here is when you're returning UsersStream
you're moving data out of the function fn users(..)
. Now the caller of the function users(..)
owns the returned data and (theoretically) can do whatever it wants to do, including keeping the data for the lifetime of the application, i.e. giving the data 'static lifetime. But if UsersStream
has a reference to a data (context.pool
) with a limited lifetime, what'll happen when the owner of pool
drops the data? It'll refer to nullpointer as data is dropped. So the compiler throws errors to prevent such a situation.
So what you can do here is to somehow pass the owned data(pool
) instead of reference, ensuring that pool
has the same lifetime as UsersStream
as it owns the data now. clone()
does exactly this, it creates an owned copy(either reference-counted or byte copy) of the data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With