Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does one use `sqlx` with `juniper` subscriptions in Rust?

Background:

I am having trouble integrating sqlx with juniper subscriptions.

I am getting a Pin<Box<dyn Stream<Item = Result<User, sqlx::Error>> + 'e + Send>> from sqlx::query::QueryAs::fetch().

juniper needs subscriptions to be returned as Pin<Box<dyn Stream<Item = Result<User, juniper::FieldError>> + Send>>.

Note the change from Result<User, sqlx::Error> to Result<User, juniper::FieldError>. Using map_err() from futures::TryStreamExt, I created the following code to perform the query and transform the error type.

type UsersStream =
    Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot {
    async fn users(context: &Context) -> UsersStream {
        let sqlx::query_as!(User, "SELECT * FROM users")
            .fetch(&context.pool)
            .map_err(|e| {
                FieldError::new(
                    "Database error",
                    graphql_value!(format!("{}", e)))
            })
            .boxed()
    }
}

This fails with the following error on compile:

error[E0759]: `executor` has lifetime `'ref_e` but it needs to satisfy a `'static` lifetime requirement
  --> server/src/graphql/subscription.rs:27:1
   |
27 |   #[juniper::graphql_subscription(Context = Context)]
   |   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |   |
   |   this data with lifetime `'ref_e`...
   |   ...is captured here...
...
63 | /         sqlx::query_as!(User, "SELECT * FROM users")
64 | |             .fetch(&context.pool)
65 | |             .map_err(|e| {
66 | |                 FieldError::new(
...  |
69 | |             })
70 | |             .boxed()
   | |____________________- ...and is required to live as long as `'static` here
   |
   = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)

error: aborting due to previous error

I am not familiar enough with Streams or lifetimes to understand the implications of this error. After looking into this some more, it seems that ref_e is the lifetime of the subscription's reference to juniper's Executor.

Attempts:

  • Providing a lifetime to juniper::Context as discussed in graphql-rust/juniper#143.
  • Higher-ranked trait bounds

Versions:

  • sqlx-0.4.1
  • juniper pinned to commit cd66bdb on master
like image 706
ehegnes Avatar asked Dec 02 '20 03:12

ehegnes


2 Answers

your code is not exactly like mine but I think the solution can apply here too, try to clone the pool before using it:

type UsersStream =
    Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot {
    async fn users(context: &Context) -> UsersStream {
        let pool = context.pool.clone();
        let sqlx::query_as!(User, "SELECT * FROM users")
            .fetch(&pool)
            .map_err(|e| {
                FieldError::new(
                    "Database error",
                    graphql_value!(format!("{}", e)))
            })
            .boxed()
    }
}
like image 102
Mathieu Avatar answered Dec 29 '22 01:12

Mathieu


The answer by Mathieu is correct. So I'll explain the reason behind the error.

The fundamental issue here is when you're returning UsersStream you're moving data out of the function fn users(..). Now the caller of the function users(..) owns the returned data and (theoretically) can do whatever it wants to do, including keeping the data for the lifetime of the application, i.e. giving the data 'static lifetime. But if UsersStream has a reference to a data (context.pool) with a limited lifetime, what'll happen when the owner of pool drops the data? It'll refer to nullpointer as data is dropped. So the compiler throws errors to prevent such a situation.

So what you can do here is to somehow pass the owned data(pool) instead of reference, ensuring that pool has the same lifetime as UsersStream as it owns the data now. clone() does exactly this, it creates an owned copy(either reference-counted or byte copy) of the data.

like image 29
Sarowar Avatar answered Dec 29 '22 02:12

Sarowar