cksac / dataloader-rs

Rust implementation of Facebook's DataLoader using async-await.
Apache License 2.0
261 stars 23 forks source link

How to use it with Juniper? #10

Closed acelot closed 4 years ago

acelot commented 4 years ago

Before Rust 1.39, I successfully used this library. But with the advent of async/await, I was completely confused. Below is an example of my broken code:

#[juniper::object(Context = Context)]
impl Article {
    fn id(&self) -> i32 {
        self.id
    }

    fn title(&self) -> &str {
        self.title.as_str()
    }

    async fn authors (&self, context: &Context) -> FieldResult<Vec<Author>> {
        let authors = context.authors_loader().load_many(self.author_ids.clone());
        Ok(authors.await)
    }
}

I'm trying to load an authors of the article with dataloader, but I got confused with the return types and how to get authors from the dataloader. The compiler gives me an error:

error[E0728]: `await` is only allowed inside `async` functions and blocks
  --> src/schema.rs:52:12
   |
40 | #[juniper::object(Context = Context)]
   | ------------------------------------- this is not `async`
...
52 |         Ok(authors.await)
   |            ^^^^^^^^^^^^^ only allowed inside `async` functions and blocks

error[E0308]: mismatched types
  --> src/schema.rs:52:12
   |
52 |         Ok(authors.await)
   |            ^^^^^^^^^^^^^ expected struct `std::vec::Vec`, found enum `std::result::Result`
   |
   = note: expected type `std::vec::Vec<_>`
              found type `std::result::Result<std::vec::Vec<_>, dataloader::LoadError<()>>`

error: aborting due to 2 previous errors
cksac commented 4 years ago

for the error[E0308]: mismatched types, I think need to handle the loader return type, which is Result

1) use .await?, seems not possible with dataloader version 0.6

error[E0277]: `dataloader::LoadError<()>` doesn't implement `std::fmt::Display`
  --> src/main.rs:40:40
   |
40 |         Ok(context.loader.load(1).await?)
   |                                        ^ `dataloader::LoadError<()>` cannot be formatted with the default formatter
   |
   = help: the trait `std::fmt::Display` is not implemented for `dataloader::LoadError<()>`
   = note: in format strings you may be able to use `{:?}` (or {:#?} for pretty-print) instead
   = note: required because of the requirements on the impl of `std::convert::From<dataloader::LoadError<()>>` for `juniper::executor::FieldError`
   = note: required by `std::convert::From::from`

2) use match

#[juniper::object(
    Context = Ctx,
)]
impl Query {
    async fn favoriteEpisode(context: &Ctx) -> FieldResult<Episode> {
        match context.loader.load(1).await {
            Ok(e) => Ok(e),
            Err(e) => {
                FieldError::new("field error from loader error")
            }
        }
    }
}

for error[E0728]: await is only allowed inside async functions and blocks

I got same error as you when use juniper = "0.14.2", which version you are using?

acelot commented 4 years ago

Thanks for reply.

[[package]]
name = "juniper"
version = "0.14.2"

Using match:

    async fn authors(&self, context: &Context) -> FieldResult<Vec<Author>> {
        match context
            .authors_loader()
            .load_many(self.author_ids.clone())
            .await
        {
            Ok(a) => Ok(a),
            Err(e) => Err(FieldError::new(
                "Cannot load authors",
                graphql_value!({ "internal_error": "Connection refused" }),
            )),
        }
    }

Errors:

error[E0728]: `await` is only allowed inside `async` functions and blocks
  --> src/schema.rs:58:15
   |
47 |   #[juniper::object(Context = Context)]
   |   ------------------------------------- this is not `async`
...
58 |           match context
   |  _______________^
59 | |             .authors_loader()
60 | |             .load_many(self.author_ids.clone())
61 | |             .await
   | |__________________^ only allowed inside `async` functions and blocks
cksac commented 4 years ago

I try to run cargo expand to check the generated code and found no aync function was generated by juniper. I think async-awit support is not landed in juniper = "0.14.2", you may need to experiement with

juniper = { git = "https://github.com/graphql-rust/juniper", branch = "async-await", features = ["async"] }

https://github.com/graphql-rust/juniper/pull/461

acelot commented 4 years ago

Ok, I will try. Thank you for help!

jayy-lmao commented 4 years ago

Ok, I will try. Thank you for help!

I'm also currently in the process of implementing this in a juniper server. I've managed to have it all hooked up, but it's still only getting run with one key at a time, have you been able to get it to a point where the queries are getting batched? If so, how have you structured your resolvers?

Edit:

I thought I'd provide some code to clarify what I mean :grin:

I can use executor::block_on for

    fn person_by_id(context: &Context, id: i32) -> Person {
        let res = person_by_id(context, id);
        executor::block_on(res)
    }

but then it seems the data loader only loads ids one at a time- block_on probably is forcing them to run in sequence. If I try and get things more synchronous as in acelot's above example:

async fn person_by_id(ctx: &Context, id: i32) -> Person {
    let res = ctx.person_loader.load(id);
    res.await.unwrap()
}

#[juniper::graphql_object(Context = Context)]
impl Query {
    async fn users(context: &Context, limit: Option<i32>) -> Vec<i32> {
        let vec = vec![1, 2, 3, 4];
        vec
    }
    async fn person_by_id(context: &Context, id: i32) -> Person {
        person_by_id(context, id).await
    }

then I run into the following error:

panicked at 'Tried to resolve async field users on type Some("Query") with a sync resolver', src/graphql/model.rs:70:1

My aim is to contribute to an example using this dataloader for https://github.com/graphql-rust/juniper/issues/444 :smile:

acelot commented 4 years ago

@jayy-lmao Hello! I am not very strong in Rust. I learn a language by stepping on a rake :)

About my project structure

Server:

let schema = Arc::new(create_schema());
let ctx = Arc::new(Context::new(
    pool.clone(),
    Loader::new(AuthorsLoader::new(pool.clone())),
));

HttpServer::new(move || {
    App::new()
        .data(schema.clone())
        .data(ctx.clone())
        .wrap(middleware::Logger::default())
        .service(web::resource("/").route(web::post().to(graphql)))
})
.bind(SocketAddr::new(
    IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
    listen_port,
))
.expect("Не удалось запустить веб-сервер")
.run()
.await

GraphQL handler:

async fn graphql(
    st: web::Data<Arc<Schema>>,
    ctx: web::Data<Arc<Context>>,
    gqr: web::Json<GraphQLRequest>,
) -> impl Responder {
    let res = gqr.execute_async(&st, &ctx).await;
    let body = serde_json::to_string(&res)?;
    Ok::<_, serde_json::error::Error>(
        HttpResponse::Ok()
            .content_type("application/json")
            .body(body),
    )
}

Dataloader:

impl BatchFn<i32, Author> for AuthorsLoader {
    type Error = ();

    fn load(&self, keys: &[i32]) -> BatchFuture<Author, Self::Error> {
        println!("load batch {:?}", keys);

        let mut c = self.db.get().unwrap();

        let rows = c.query(r#"
            SELECT id, "fullName"
            FROM articles.authors 
            WHERE id = ANY($1)
        "#, &[&keys]).unwrap();

        let items = rows.iter().map(|row| Author {
            id: row.get("id"),
            full_name: row.get("fullName"),
        });

        future::ready(items.collect())
            .unit_error()
            .boxed()
    }
}

Resolver:

#[juniper::graphql_object(Context = Context)]
impl Article {
    fn id(&self) -> i32 {
        self.id
    }

    fn title(&self) -> &str {
        self.title.as_str()
    }

    async fn authors(&self, context: &Context) -> FieldResult<Vec<Author>> {
        match context
            .authors_loader()
            .load_many(self.author_ids.clone())
            .await
        {
            Ok(a) => Ok(a),
            Err(_) => Err(FieldError::new(
                "Cannot load authors",
                graphql_value!({ "internal_error": "Some error" }),
            )),
        }
    }
}

At this time, the project is compiling successfully, but it fails with the first request:

thread 'actix-rt:worker:0' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /home/pioneer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.9/src/runtime/enter.rs:19:5
jayy-lmao commented 4 years ago

@jayy-lmao Hello! I am not very strong in Rust. I learn a language by stepping on a rake :)

Same here,

It looks like our approaches are very similar, my error also is occurring during the first request. Currently my two repos I'm using as a reference are:

  1. https://github.com/actix/examples/tree/master/juniper/src
  2. https://github.com/clifinger/canduma/

That block_on error is frustrating... maybe it's to do with your awaits as that's the only thing I can think is behaving like a block_on :thinking:

Edit: If you use executor::block_on instead of .await, it will work on juniper 0.14.2, however then it wont batch requests :grimacing:

acelot commented 4 years ago

I have changed my GraphQL handler using web::block helper and now I'm getting the same error:

Tried to resolve async field authors on type Some("Article") with a sync resolver
jayy-lmao commented 4 years ago

I have changed my GraphQL handler using web::block helper and now I'm getting the same error:

Tried to resolve async field authors on type Some("Article") with a sync resolver

Well, looks like we're both stuck in the same place! I will let you know if I find a way around :rofl:

Edit: p.s. it looks like https://github.com/graphql-rust/juniper/pull/461 has landed now, maybe there will be a 0.14.3 soon

arn22433 commented 4 years ago

Hello, I was stuck on the same issue and managed to find a solution based on the feedback posted above.

As I haven't found much doc on this issue, I'll give a detailed explanation. This is my first project in rust, so you might see some ugly stuff. Feedback is more than welcome :).

First, I'm using the async version of Juniper as mentioned above:

juniper = { git = "https://github.com/graphql-rust/juniper", branch = "async-await", features = ["async"] }

In this version, juniper::object is renamed as juniper::graphql_object.

Second I define a dataloader. I'm using mysql for database queries.

#[derive(Clone)]
pub struct RankDataLoader {
    pool: Arc<mysql::Pool>,
}

impl RankDataLoader {
    pub fn new(pool: Arc<mysql::Pool>) -> RankDataLoader {
        RankDataLoader { pool }
    }

    fn fetch_player_ranks(&self, keys: &[String]) -> Vec<Vec<PlayerRank>> {
        // build query with a WHERE x IN (keys) statement
        // Interpret rows as PlayerRank object
        // Build Vec of PlayerRank for every key in keys
        // Return Vec of Vec of PlayerRank with order identical to one from keys
    }
}

impl BatchFn<String, Vec<PlayerRank>> for RankDataLoader {
    type Error = ();

    fn load(&self, keys: &[String]) -> BatchFuture<Vec<PlayerRank>, Self::Error> {
        println!("load batch {:?}", keys);

        future::ready(self.fetch_player_ranks(keys))
            .unit_error()
            .boxed()
    }
}

I then define the context that will be passed with every GraphQL query:

#[derive(Clone)]
pub struct Context {
    pub pool: Arc<mysql::Pool>,
    pub rank_data_loader: dataloader::cached::Loader<
        String,
        Vec<PlayerRank>,
        (),
        RankDataLoader,
        std::collections::BTreeMap<
            String,
            dataloader::cached::CacheItem<
                Result<Vec<PlayerRank>, dataloader::LoadError<()>>,
                dataloader::non_cached::LoadFuture<String, Vec<PlayerRank>, (), RankDataLoader>,
            >,
        >,
    >,
}

impl Context {
    pub fn new(pool: Arc<mysql::Pool>) -> Context {
        Context {
            pool: pool.clone(),
            rank_data_loader: Loader::new(RankDataLoader::new(pool.clone())).cached(),
        }
    }
}

impl juniper::Context for Context {}

And then I define a ContextFactory to be able to build a new context for every query.

pub trait ContextFactory {
    type Context;
    fn new_context(&self) -> Self::Context;
}

#[derive(Clone)]
pub struct MyContextFactory {
    pub pool: Arc<mysql::Pool>,
}

impl ContextFactory for MyContextFactory {
    type Context = Context;
    fn new_context(&self) -> self::Context {
        self::Context::new(self.pool.clone())
    }
}

There are probably better ways to write this (in particular the type of rank_data_loader in Context 😄 ).

I'm using warp to have a graphiql instance. My code is based on a blog post. Here's the method that executes the GraphQL that I had to modify:

pub fn make_graphql_filter<Query, Mutation, ContextBuilder, ContextT>(
    path: &'static str,
    schema: juniper::RootNode<'static, Query, Mutation>,
    context_factory: ContextBuilder,
) -> BoxedFilter<(impl warp::Reply,)>
where
    ContextBuilder: ContextFactory<Context = ContextT>,
    ContextT: juniper::Context + Send + Sync + Clone + 'static,
    Query: juniper::GraphQLTypeAsync<juniper::DefaultScalarValue, Context = ContextT, TypeInfo = ()>
        + Send
        + Sync
        + 'static,
    Mutation: juniper::GraphQLTypeAsync<juniper::DefaultScalarValue, Context = ContextT, TypeInfo = ()>
        + Send
        + Sync
        + 'static,
{
    let schema = Arc::new(schema);
    let context = context_factory.new_context();

    let handle_request =
        move |request: juniper::http::GraphQLRequest| -> Result<Vec<u8>, serde_json::Error> {
            let mut rt = futures::executor::LocalPool::new();
            let f = request.execute_async(&schema, &context);
            let res = rt.run_until(f);
            serde_json::to_vec(&res)
        };

    warp::post2()
        .and(warp::path(path.into()))
        //        .and(context_extractor)
        .and(warp::body::json())
        .map(handle_request)
        .map(build_response)
        .boxed()
}

Again, I'm very new to rust, so this could maybe be rewritten better. If you notice something wrong, please let me know.

Finally, async keywords can be added to the resolvers and the resolver using the dataloader looks like this:

pub async fn fetch_player_ranks(id: &String, ctx: &Context) -> Vec<PlayerRank> {
    ctx.rank_data_loader.load(String::from(id)).await.unwrap()
}

I hope this will be helpful to both of you.


Edit: I decided to switch to Actix as I find it far more intuitive. Performance also seems to be slightly better.

Here's the function to execute a GraphQL query:

async fn graphql(
    st: web::Data<Arc<Schema>>,
    context_factory: web::Data<Arc<MyContextFactory>>,
    data: web::Json<GraphQLRequest>,
) -> impl Responder {
    let context = context_factory.new_context();
    let res = data.execute_async(&st, &context).await;
    let body = serde_json::to_string(&res)?;
    Ok::<_, serde_json::error::Error>(
        HttpResponse::Ok()
            .encoding(ContentEncoding::Gzip)
            .content_type("application/json")
            .body(body),
    )
}
arn22433 commented 4 years ago

The library does not appear to batch properly when using multiple Dataloaders. It fails to batch most of the time (sometime batches 2 or 3 keys together, but most keys are loaded individually).

I assume this is an issue with the scheduling of the execution of the futures. I managed to fix it by adding a delay to the execution of the futures. To do this, I simply modified lien 62 of non_cached.rs to have a delay of 1ms instead of none.

    pub fn load(&self, key: K) -> LoadFuture<K, V, E, F>
    where
        E: Clone,
    {
        let id = {
            let mut st = self.state.lock().unwrap();
            let id = st.new_unique_id();
            st.queued_keys.insert(id, key);
            id
        };
        LoadFuture {
            id,
            // delay: None,
            delay: Some(Delay::new(Duration::from_millis(1))),
            stage: Stage::Created,
            state: self.state.clone(),
        }
    }

This is a very poor way of fixing it, but it seems to work. Maybe someone with more experience with Rust can find a proper way of fixing this.


Edit: Looking further into the source code, execution appears to be delayed already, thus I don't really know why the delay I added made a difference.

// Skipping first poll for LoadFuture allows to defer a batch loading,
// and collect more keys for the batch.
if let Stage::Created = self.stage {
    self.stage = Stage::Polled;
}
// Reschedule this LoadFuture execution to the end of event loop.
// This allows to defer the actual loading much better, as leaves space
// for more event loop ticks to happen before doing actual loading,
// where new keys may be enqueued.
self.delay = Some(Delay::new(Duration::from_nanos(1)));
let _ = ready!(self.delay.as_mut().unwrap().poll_unpin(cx));
// If Delay Future is somehow ready instantly (normally, this should not
// happen), then defer this LoadFuture execution with Waker, which is
// not as good as Delay Future in deferring, but is something at least.
cx.waker().wake_by_ref();
jayy-lmao commented 4 years ago

@arn22433

let res = data.execute_async(&st, &context).await;

that execute_async is exactly what @acelot and I were missing! Thank you so much 🥇

Also thanks for the warning with the batching issue- I did a test project where this seemed to happen intermittently and I wasn't quite sure what was going on 🤔

Edit:

db_1   | 2020-01-28 09:13:08.947 UTC [66] LOG:  execute <unnamed>: SELECT person_id, person_name, cult FROM persons WHERE person_id = ANY($1)
db_1   | 2020-01-28 09:13:08.947 UTC [66] DETAIL:  parameters: $1 = '{1,2}'

Almost brings tears to my eyes :')

jayy-lmao commented 4 years ago

Just a plug for my demo repo in case anyone reading this issue in future wants a reference:

https://github.com/jayy-lmao/rust-graphql-docker