Closed sebastianthelen closed 5 years ago
I'd love to see an example too!
Just an idea: you could put the Dataloaders on info.context
(which is actually the current request).
Not sure what framework you're using, but with Flask I think the approach that we're going to take is to instantiate and attach our dataloaders to the g
object on before_request
and then delete it on teardown_request
. Basically combining some notes from the DataLoader docs here https://github.com/syrusakbary/aiodataloader#creating-a-new-dataloader-per-request and the Flask docs here http://flask.pocoo.org/docs/1.0/patterns/deferredcallbacks/#deferred-request-callbacks.
Avery has the correct approach, the flask request context also jives with the Sanic example in aiodataloader. Closing, please comment if you feel this needs reopening.
I too am using Graphene + Flask-GraphQL + DataLoader (specifically AIODataLoader, with the AsyncioExecutor
), and I am now trying to bind my DataLoaders to the request's lifecycle. My unfamiliarity with Flask/AsyncIO is leaving me with a piece of this puzzle still missing.
Could @averypmc, @sebastianthelen, or others provide a more complete example?
Piecing together the links here and in other Flask/Graphene/DataLoader docs, I have something like...
# project/loaders.py
from project.db import models
from aiodataloader import DataLoader
class UserLoader(DataLoader):
async def batch_load_fn(self, ids):
items = models.db_session.query(models.User).filter(models.User.id.in_(ids))
item_dict = {}
for x in items:
item_dict[x.id] = x
# Reorder items to match incoming id order
return [item_dict.get(id) for id in ids]
and
# project/api.py
from project.schema import schema
from project.loaders import UserLoader
from flask import Flask
from flask_graphql import GraphQLView
from graphql.execution.executors.asyncio import AsyncioExecutor
app = Flask(__name__)
@app.before_request
def construct_dataloaders():
g.dataloaders = {'user_loader': UserLoader()}
app.add_url_rule('/graphql', view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True, context={}, executor=AsyncioExecutor()))
@app.teardown_appcontext
def teardown_loaders():
g.pop('dataloaders', None)
if __name__ == '__main__':
app.run()
and
# project/schema.py
import graphene
class User(graphene.ObjectType):
id = graphene.ID(required=True)
email = graphene.String()
first_name = graphene.String()
last_name = graphene.String()
class Query(graphene.ObjectType):
user = graphene.Field(User, id=graphene.ID(required=True))
async def resolve_user(self, info, id):
return await <SOMETHING>['user_loader'].load(id))
schema = graphene.Schema(query=Query)
Previously I was allowing the DataLoader instances to be bound to the application lifecycle, and everything seemed to work fine.
When I attempt to run this and make a request, I get a RuntimeError: There is no current event loop in thread 'Thread-2'.
at the point when it hits the line g.loaders = {'user_loader': UserLoader()}
in the @app.before_request
method in api.py
. I'm guessing I need to set up an asyncio event loop for the flask process itself and pass that to the AsyncioExecutor
via its loop
param or something, but how exactly do I do that?
What goes in the <SOMETHING>
in my resolver in schema.py
? Is it g.dataloaders
(assuming I import g
from flask
there)?
Thanks in advance for any help :smiley:
I created a new event loop and passed it to both the AsyncioExecutor
and my DataLoader
(the loop
param to to aiodataloader
's DataLoader
constructor isn't in the readme anywhere, but it's set here).
# project/api.py
from project.schema import schema
from project.loaders import UserLoader
from flask import Flask
from flask_graphql import GraphQLView
from graphql.execution.executors.asyncio import AsyncioExecutor
import asyncio
app = Flask(__name__)
@app.before_request
def construct_dataloaders():
global loop
g.dataloaders = {'user_loader': UserLoader(loop=loop)}
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
app.add_url_rule('/graphql', view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True, context={}, executor=AsyncioExecutor(loop=loop)))
@app.teardown_appcontext
def teardown_loaders():
g.pop('dataloaders', None)
if __name__ == '__main__':
app.run()
This seems to be "working", but can anyone confirm if this is valid usage? From logging output, it appears my DataLoader
's __init__
is indeed being called again with each new request. My concern is around sharing the same event loop between what appears to be different threads, which I don't think I'm supposed to be doing since aiodataloader
uses call_soon()
instead of call_soon_threadsafe()
.
Is it possible to get this issue re-opened?
+1, can't find anything related 2 years later
I ran into this issue as well with graphene v2.1.8 and tried to implement the solution above. I didn't end up getting aiodataloader to work too well without a lot of hacks on the event loop (mainly running into issues with the main thread blocking). So, I ended up using promise.DataLoader
, which the graphene documenation seems to recommend.
I tried constructing dataloaders using app.before_request
, which works as the prior comments suggest. However, we have some internal flask routes that don't need dataloaders to be constructed on every new request context. So, I ended up overriding get_context
, which seems like a fine approach without needing to do something like g.dataloaders = None
in the app context teardown. This is based on the idea that for each new request context, dataloaders are instantiated and are cached only for the request when passed through info.context
.
Here's some sample code of this working:
# dataloaders.py
from promise import Promise
from promise.dataloader import DataLoader
from some.internal.libary import UnitQuery
class UnitSamplesLoader(DataLoader):
def batch_load_fn(self, ids):
samples = UnitQuery.get_samples(unit_ids=ids) # bulk query we wrote
samples_dict = defaultdict(list)
for sample in samples:
sample_dict[sample.unit_id].append(sample)
return Promise.resolve([samples_dict.get(unit_id, []) for unit_id in ids])
# context.py
from typing import Dict
from dataloaders import UnitSamplesLoader
from promise.dataloader import DataLoader
def construct_dataloaders() -> Dict[str, DataLoader]:
# Here, we looped through our registry to figure out which dataloaders to instantiate.
dataloaders = {"unit__samples__loader": UnitSamplesLoader()} # type: Dict[str, DataLoader]
return dataloaders
def get_graphql_context() -> Dict[str, Dict[str, DataLoader]]:
return {"dataloaders": construct_dataloaders()}
Note, in the v3 graphql-server port, you can probably use the context parameter instead of overriding get_context
# app.py
from context import get_graphql_context
from flask_graphql import GraphQLView
graphql_view = GraphQLView.as_view(
"graphql",
schema=schema,
middleware=middleware,
get_context=get_graphql_context,
)
Then, you can resolve the dataloaders like this
# schema.py
import graphene
class Unit(graphene.ObjectType):
def resolve_samples_loader(self, info, **kwargs):
dataloaders = info.context["dataloaders"]
return dataloaders[f"unit__samples__loader"].load(self.id)
I have a solution that is quite similar to what @fangherk proposed. The difference is that I'm using contextvars directly (instead of using the info.context.x
).
Using ContextVar
would allow your team to simply call get_loader("loader_name")
like so:
from graphene import relay
from graphene_sqlalchemy import SQLAlchemyObjectType
from helpers.context.dataloaders import get_loader
from db import models
class Comment(SQLAlchemyObjectType):
class Meta:
model = models.Comment
interfaces = (relay.Node,)
def resolve_client(review: models.Comment, info):
return get_loader("user").load(review.user_id)
In a ASGI app, each incoming request runs in its own task, therefore, each query execution can have its own private context (natively). Which means you have a free and (I feel) better way of passing request-based context. With this strategy you can access the data loaders anywhere, even if you don't have access to info.context
.
When we get a new query, we attach all loaders (we could attach them individually only when required, but I'm not sure that's worth the effort):
from starlette.graphql import GraphQLApp as BaseGraphQLApp
from helpers.context import dataloaders
class GraphQLApp(BaseGraphQLApp):
async def execute(self, query, context=None, **kwargs):
dataloaders. initialize_from_request_context(context)
return await super().execute(query, context=request_context, **kwargs)
The code that creates data loaders:
# db/dataloaders.py
from typing import Dict, List
from aiodataloader import DataLoader
from db import crud, models
def create_loaders() -> Dict[str, DataLoader]:
return {
"user": DataLoader(batch_user),
}
async def batch_user(keys: List[str]) -> List[models.User]:
user = {u.user_id: u for u in crud.users(keys)}
return [user.get(user_id) for user_id in keys]
The code that can set/get loaders' context:
# helpers/context/dataloaders.py
from contextvars import ContextVar
from typing import Dict
from aiodataloader import DataLoader
from db.dataloader import create_loaders
dataloaders_context: ContextVar[Dict[str, DataLoader]] = ContextVar(
"loaders", default=dict()
)
def initialize_from_request_context(context):
dataloaders = create_loaders()
dataloaders_context.set(dataloaders)
return dataloaders
def get_loader(loader_name: str):
dataloaders = dataloaders_context.get()
return dataloaders.get(loader_name)
In practice you would probably include your current user ID in the create_loaders
so you can attach it to loaders that may require it. But once again, you could simply use a per-request context to share the current (via ContextVar
).
Hi,
there seems to be some discussion about what's the best way to use dataloader objects (see https://github.com/facebook/dataloader/issues/62#issue-193854091). The general question is whether dataloader objects should be used as application level caches or rather at request level.
My current implementation is based on https://docs.graphene-python.org/en/latest/execution/dataloader/ where dataloaders seem to be used as application level caches. The nice thing about this is that requests can benefit from what has already been cached by previous requests. However, I'm struggling with how to invalidate my dataloader in case the data in the repository changes. It occured to me that such issues could be prevented by moving the dataloader to the request level as suggested (sure, cached data would not be shared between requests anymore). Unfortunately, it is not clear to me how to do this based on the example in the documentation because the request itself is not explicitely represented.
Can someone provide a small example that uses graphene + flask-graphql
Cheers, Sebastian