graphql-python / graphql-core-legacy

GraphQL base implementation for Python (legacy version – see graphql-core for the current one)
MIT License
374 stars 184 forks source link

Subscription must return Async Iterable or Observable. Received: #149

Open japrogramer opened 6 years ago

japrogramer commented 6 years ago

Hello, I have previously gotten subscriptions to crudely work https://github.com/graphql-python/graphene/pull/500#issuecomment-325560994

But now i want to try the new version of the graphql-core library and here is how far I've gotten. It is not very clear to me what a Async Iterable is meant to be I assumed that its a Async generator?

but from this line it seems to be anything that is an instance of an Observable https://github.com/graphql-python/graphql-core/blob/0b0551e7dfb724dedc66759145b9221595ec8598/graphql/execution/executor.py#L296 Is there any documentation for graphql-cores new approach to subscriptions .. I would very much like to learn more.

Here is what im trying

[GraphQLError('Subscription must return Async Iterable or Observable. Received: <async_generator object ProductSubscritption.resolve_sub_product.<locals>.compat at 0x7f6335e89d48>',)]

    def resolve_sub_product(self, info, **input):
        async def compat(result, delay):
            yield result
            await asyncio.sleep(delay)
        return compat(make_sub(info, input.get('product')), .1)
japrogramer commented 6 years ago

Hmm I think I understand now, In the on next, I could also send to the reply channel in channels My question is this now, will it block other normal request from occurring in django? hmm, I need to figure out how to connect django signals with this new mechanism ... any ideas?

currently im adding the reply channel to a group and returning the query, With this new layout would it even be necessary to add the reply_channel to a group ? hmm, I need to keep reading.

~   1         result = schema.execute(query, variable_values=foovar, allow_subscriptions=True, **kwargs)                                                                                                            
+   2         if isinstance(result, rx.Observable):                                                                                                                                                                 
+   3             class MyObserver(rx.Observer):                                                                                                                                                                    
+   4                                                                                                                                                                                                               
+   5                 def on_next(self, x):                                                                                                                                                                         
+   6                     nonlocal result                                                                                                                                                                           
+   7                     result = x                                                                                                                                                                                
+   8                                                                                                                                                                                                               
+   9                 def on_error(self, e):                                                                                                                                                                        
+  10                     ...                                                                                                                                                                                       
+  11                                                                                                                                                                                                               
+  12                 def on_completed(self):                                                                                                                                                                       
+  13                     ...                                                                                                                                                                                       
+  14             result.subscribe(MyObserver())                                                                                                                                                                    
   15         data = result.data       
perrosnk commented 6 years ago

+1

AgentChris commented 6 years ago

can you give us the complete code, to play with it?

japrogramer commented 6 years ago

most of it is in https://github.com/graphql-python/graphene/pull/500#issuecomment-325560994

this is the part that makes it rx compliant

  1 def make_sub(info, gid):
  2     inst = relay.Node.get_node_from_global_id(info, gid)
  3     try:
  4         gp_name = 'gqp.{0}-updated.{1}'.format(str.lower(inst.__class__.__name__), inst.pk)
  5         Group(gp_name).add(info.context.reply_channel)
  6         info.context.channel_session['Groups'] = ','.join(
  7             (gp_name, info.context.channel_session['Groups']))
  8     except:
  9         pass
 10     return iter([inst])

_   1     def resolve_sub_product(self, info, **input):                                                                                                                                                              
  162         return rx.Observable.from_iterable(make_sub(info, input.get('product')))
eamigo86 commented 6 years ago

Hello @japrogramer, any progress returning an Async Iterable or Observable in the subscriptions?

japrogramer commented 6 years ago

@eamigo86 yes, rx.Observable.from_iterable( ... ) in the resolve method on the comment above I explain more.

eamigo86 commented 6 years ago

Hi again @japrogramer, my problem is that the subscription always gives me back a promise with the observable inside. In this image you can see that the function resolve_or_error within the function subscribe_field on execution/executor.py, always gives me back a promise. That's why I get the error.

image

I use the default SyncExecutor executor. So in when I debug in its execute function get this: image

Any idea how to fix it?

japrogramer commented 6 years ago

@eamigo86 what query are you running and what does the resolve for those fields return?

japrogramer commented 6 years ago

are you using a dataloader in any of those fields?

eamigo86 commented 6 years ago

No, I not use Dataloader, I just define my Subscription like a ObjectType class, and on my subscription_resolver function return a ObjectType, thanks to your code I now return a Observable, just like this: image

japrogramer commented 6 years ago

ok, so when the schema returns a result it should be an instance of rx.Observable than this code should help you out https://github.com/graphql-python/graphql-core/issues/149#issuecomment-345086276

japrogramer commented 6 years ago

the way Im doing subsrciptions is by adding the reply_channel to a group https://github.com/graphql-python/graphql-core/issues/149#issuecomment-350405781 and because Im serving subscriptions over a websocket, when i send in the group it goes to the people subscribed. So im subscribing to events and than in the client I determine if I want to query for the data based on the events.

socket = new WebSocket("ws://" + window.location.host + "/gql");
socket.onmessage = function(e) {
    alert(e.data);
}
socket.send(JSON.stringify({query: 'subscription {\
  subProduct (uuid: "714a3f4f-4a21-4ff9-b058-f12ad6389f72"){\
    id,\
    disabled,\
    title,\
  }\
}'}))
eamigo86 commented 6 years ago

I understand the way you use the subscriptions, in my case, I use the subscriptions in graphql only to register a user connected by wescoket to the server, in a certain group, so when the event to which it has subscribed is given, it is notified via websocket, like this:

image

Everything works great, except that in version 2.0 of graphql-core an Iterable or an Observable Async must be returned, and I only return the confirmation that the subscription was made in a satisfactory way.

eamigo86 commented 6 years ago

I only manage to obtain an Observable if I put in the settings:

GRAPHENE = {  
    'MIDDLEWARE': []
}
japrogramer commented 6 years ago

Is the graphene_django debug middleware causing problems? or what other middleware are you using?

japrogramer commented 6 years ago

This are my settings, I can confirm that the client does receive a message when the product that it subscribes to is updated.

113 if 'MIDDLEWARE' in GRAPHENE:                                                                                                                                                                                                                                                                                                                                                                                                        | 
  1     GRAPHENE['MIDDLEWARE'] += ['graphene_django.debug.DjangoDebugMiddleware', ]                                                                                                                                                                                                                                                                                                                                                     | 
  2 else:                                                                                                                                                                                                                                                                                                                                                                                                                               | 
  3     GRAPHENE.update({'MIDDLEWARE': ['graphene_django.debug.DjangoDebugMiddleware', ]})
japrogramer commented 6 years ago

And than In my root Query

~|  7     if settings.DEBUG:                                                                                                                                                                                                                                                                                                                                                                                                              
~|  8         debug = graphene.Field(DjangoDebug, name='__debug')                                                                                                                                                                                                                                                                                                                                                                         
~|  9     node = relay.Node.Field() 
eamigo86 commented 6 years ago

Hi @japrogramer. That looks like my configuration, but if I do not specify any middleware or I put DjangoDebugMiddleware, it always returns a promise, only if I explicitly say that the value of the key MIDDLEWARE = [], returns the Observable. I guess I'll have to keep investigating a bit more to find out why it's not working as I expect. Anyway, thanks for your advice and your time.

eamigo86 commented 6 years ago

Hi again @japrogramer, could you teach me the full implementation of your Observer class? After a few hours and using some tricks, I managed to return an Observable regardless of the configuration of the Middleware in the setings.py

japrogramer commented 6 years ago

@eamigo86 this is my observer class https://github.com/graphql-python/graphql-core/issues/149#issuecomment-345086276

From What i have seen from your subscriptions project it seems to me that you are looking for a scheduler https://youtu.be/jKqWMvdTuE8?t=17m42s

I think the threadpoolscheduler is the most appropriate for what you are trying to do

japrogramer commented 6 years ago

@eamigo86 actually it looks more like this now .

16 def send(result, message):                                                                                                                                                                                                                                                                                                                                                                                                            
 15     data = result.data                                                                                                                                                                                                                                                                                                                                                                                                                
 14     message.reply_channel.send(                                                                                                                                                                                                                                                                                                                                                                                                       
 13         {                                                                                                                                                                                                                                                                                                                                                                                                                             
 12             'text': str({'data': json.loads(json.dumps(data))})                                                                                                                                                                                                                                                                                                                                                                       
 11         })                                                                                                                                                                                                                                                                                                                                                                                                                            
 10                                                                                                                                                                                                                                                                                                                                                                                                                                       
  9                                                                                                                                                                                                                                                                                                                                                                                                                                       
  8 @channel_session                                                                                                                                                                                                                                                                                                                                                                                                                      
  7 def ws_GQLData(message):                                                                                                                                                                                                                                                                                                                                                                                                              
  6     clean = json.loads(message.content['text'])                                                                                                                                                                                                                                                                                                                                                                                       
  5     query = clean.get('query')                                                                                                                                                                                                                                                                                                                                                                                                        
  4     foovar = clean.get('variables')                                                                                                                                                                                                                                                                                                                                                                                                   
  3     message.dataloaders = DataLoaders(get_language())                                                                                                                                                                                                                                                                                                                                                                                 
  2     kwargs = {'context_value': message}                                                                                                                                                                                                                                                                                                                                                                                               
  1     #  TODO: Implement weight, can this query run for this user or is it too expensive <10-11-17> #                                                                                                                                                                                                                                                                                                                                   
34      #  TODO: Implement timeout mechanism <10-11-17> #                                                                                                                                                                                                                                                                                                                                                                                 
  1     result = schema.execute(query, variable_values=foovar, allow_subscriptions=True, **kwargs)                                                                                                                                                                                                                                                                                                                                        
  2     if isinstance(result, rx.Observable):                                                                                                                                                                                                                                                                                                                                                                                             
  3         class MyObserver(rx.Observer):                                                                                                                                                                                                                                                                                                                                                                                                
  4                                                                                                                                                                                                                                                                                                                                                                                                                                       
  5             def on_next(self, x):                                                                                                                                                                                                                                                                                                                                                                                                     
  6                 send(x, message)                                                                                                                                                                                                                                                                                                                                                                                                      
  7                                                                                                                                                                                                                                                                                                                                                                                                                                       
  8             def on_error(self, e):                                                                                                                                                                                                                                                                                                                                                                                                    
  9                 ...                                                                                                                                                                                                                                                                                                                                                                                                                   
 10                                                                                                                                                                                                                                                                                                                                                                                                                                       
 11             def on_completed(self):                                                                                                                                                                                                                                                                                                                                                                                                   
 12                 ...                                                                                                                                                                                                                                                                                                                                                                                                                   
 13                                                                                                                                                                                                                                                                                                                                                                                                                                       
 14         result.subscribe(MyObserver())                                                                                                                                                                                                                                                                                                                                                                                                
 15     else:                                                                                                                                                                                                                                                                                                                                                                                                                             
 16         send(result, message) 

Note: In the process of refactoring this to work with apollo

dfee commented 6 years ago

If you're calling a sync resolve function for your subscription you need to return an observable. If you're calling an async resolve function, you need to yield from an asyncio generator.

Practically speaking, the former will likely require a rx.subjects.Subject (so that it can be subscribed to), and the latter will require you to implement some code that looks like this:

class ObservableAsyncIterable:
    def __init__(self, observable):
        self.disposable = None
        self.queue = asyncio.Queue()
        self.observable = observable

    def __aiter__(self):
        return self

    async def __anext__(self):
        type_, val = await self.queue.get()
        if type_ in ('E', 'C'):
            raise StopAsyncIteration()
        return val

    async def __aenter__(self):
        self.disposable = self.observable.subscribe(
            on_next=lambda val: self.queue.put_nowait(('N', val)),
            on_error=lambda exc: self.queue.put_nowait(('E', exc)),
            on_completed=lambda: self.queue.put_nowait(('C', None)),
        )
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        self.disposable.dispose()

async def resolve_mytype(root, info):
    observable = my_custom_subscribe_returning_observable()
    # if it were just a sync function we'd return observable, now.
    async with ObservableAsyncIterable(observable) as oai:
        async for i in oai:
            yield i
dfee commented 6 years ago

Hmm... I seem to be having this issue if using middleware – even a dummy middleware:

def middleware(next, root, info, **args):
    return next(root, info, **args)

Annoying, I'm about to go hunting.

dfee commented 6 years ago

I've found the answer. TLDR: use a MiddlewareManager.

The entrypoint for graphql queries is the execute method

The relevant bit is this conditional:

    if middleware:
        if not isinstance(middleware, MiddlewareManager):
            middleware = MiddlewareManager(*middleware)

So what happens when you don't supply a MiddlewareManager, but choose to supply a middleware callable (a function or a class instance with a resolve method)?

Well everything operates normally for a while. First, execute_operation ... occurs.

Assuming you've passed allow_subscriptions to the execute, schema.execute, or just used subscribe, then execute_operation calls subscribe_fields.

Inevitably, subscribe_fields calls subscribe_field which is where the trouble occurs.

In this function, a result is needed, so first, your resolve function is grabbed from the MiddlewareManager - the one you didn't supply with wrap_in_promise=False as a kwarg. So your resolve function will now be returned as a promise.

So next up, resolve_or_error is called, which ultimately results in the executor (by default, a SyncExecutor, but if you've passed a custom one, that's OK).

As you can see, executor.execute actually does the calling of your resolve function. But, alas, the function it's calling returns the subscription-unfriendly (damned?) promise and subscribe_field barfs.

So the moral of the story is to supply a MiddlewareManager as such:

from graphql.execution.middleware import MiddlewareManager

my_middleware_manager = MiddlewareManager(my_middleware1, mymiddleware2, wrap_in_promise=False)

schema.execute(statement, middleware=my_middleware_manager, allow_subscriptions=True)

Good luck out there.

@syrusakbary if subscribe_field is promise unfriendly, perhaps the MiddlewareManager instantiated in execute should supply wrap_in_promise=False if we're operating on a subscription, or alternatively allow MiddlewareManager.get_field_resolver to receive a wrap_in_promise override.

dfee commented 6 years ago

For what it's worth, this middleware also does the trick:

from promise import Promise

def depromise_subscription(next, root, info, **args):
    result = next(root, info, **args)
    if info.operation.operation == 'subscription' and isinstance(result, Promise):
        return result.get()
    return result
IPetrik commented 4 years ago

I arrived here because I am trying to get this example to work:

import asyncio
import graphene

class Query(graphene.ObjectType):
    base = graphene.String()

class Subscription(graphene.ObjectType):
    count_seconds = graphene.Float(up_to=graphene.Int())

    async def resolve_count_seconds(root, info, up_to):
        for i in range(up_to):
            yield i
            await asyncio.sleep(1.)
        yield up_to

if __name__ == '__main__':
    schema = graphene.Schema(query = Query, subscription = Subscription)
    r = schema.execute("subscription { countSeconds(upTo: 10) }", allow_subscriptions=True)
    print(r.errors)

But I get the same error [GraphQLError('Subscription must return Async Iterable or Observable. Received: <async_generator object Subscription.resolve_count_seconds at 0x0000024F3AE450D0>')] as the original post in this issue thread. What is the proper way to use an async iterator as a subscription provider? Although the error message says that Async Iterators are acceptable, the source code seems to only allow Observables.

https://github.com/graphql-python/graphql-core/blob/2c8728e43fc4dfd5c2516c5028ce390158b8fd2b/graphql/execution/executor.py#L422-L427

Is it correct that only Observables are supported?