Closed hballard closed 4 years ago
@tricoder42 what query are you using with your code? doesn't the model get read from the database for each subscriber? Did you mean to pass an observer to your stream initiation here L71 ?
@japrogramer Yeah, each subscribers needs to load the instance from DB. It may be optimised for single worker, but once you start scaling up, each subscriber might be handled by different process and you can only pass serializable objects through channels.
StreamObservable could be actually replaced with rx.Subject
which does the same.
Example query is:
subscription AlertsSubscription {
alerts {
id,
code,
date,
read
}
}
@tricoder42 how about using dataloaders from promises to fetch the instance so that the db is only hit once per process?
Dear @syrusakbary on July 30 you made a great comment about the place subscriptions implementation must be, you also told that you start the research in the branch. I just wanted to ask for an update. Did you succeed in your research? Shall we expect subscriptions to be the part of Graphene library eventually? I see there are couple implementations available (e.g. GraphQL Subscription with django-channels, graphene-django-subscriptions), but none of them can be used in production out of the box (by different reasons).
@prokher actually I'm using @tricoder42 's implementation for graphql subscription in production (with minor modifications as I'm using python3.5 and redis backend) , and it turns out working perfectly (and much cleaner than graphene-django-subscriptions).
Making bridges between two packages may be time consuming, but if you want to use subscription right now, I recommend you to simply migrate the code to your project, as it won't be much work.
@heyrict Thank you very much, we came to the same conclusion. The code in the gist indeed very simple and clean. That is definitely way to go. Manu thanks to @tricoder42.
BTW, just for information: the protocol used by the subscriptions-transport-ws (WebSocket-based transport used by Apollo subscriptions) is described here.
What do you think about not sending useless data back to the client? Actually, I am having lots of subscription objects with {'subscriptionName': None}.
def _send_result(self, id, result):
# Don't send results if no useful data is generated
errors = result.errors
if not errors:
if not isinstance(result.data, dict):
return
if sum(map(lambda x: x != None, result.data.values())) == 0:
return
self.send({
'type':
'websocket.send',
'text':
json.dumps({
'id': id,
'type': 'data',
'payload': {
'data': result.data,
'errors': list(map(str, errors)) if errors else None,
}
})
})
hey quick follow up, has anyone managed to make an asyncronous consumer for subscriptions ?
currently im using concurrent threads as the executor but im switch over to from graphql.execution.executors.asyncio import AsyncioExecutor
Im mocking out some code .. here is what i have so far
from django.utils.translation import get_language
from asgiref.sync import SyncToAsync
import json
import functools
import asyncio
import concurrent.futures
import rx
from rx.subjects import Subject
from rx.concurrency import AsyncIOScheduler
from channels.consumer import AsyncConsumer
from channels.exceptions import StopConsumer
from graphene_django.settings import graphene_settings as gqsettings
from .views import DataLoaders
schema = gqsettings.SCHEMA
class GQLConsumer(AsyncConsumer):
# NOTE: asgiref.SyncToAsync for django ORM
def __init__(self, scope):
super().__init__(scope)
# keeps a record of streams
self.subscriptions = {}
# keeps a record of groups the connection belongs to
self.groups = {}
# scheduler
# self.scheduler = AsyncIOScheduler()
# @allowed_hosts_only
async def websocket_connect(self, event):
# message.reply_channel.send({'accept': True, 'text': json.dumps({'type': 'connection_ack'})})
# TODO: This might need some security, auth users or apps only <10-11-17> #
await self.send({
"type": "websocket.accept",
"subprotocol": "graphql-ws",
})
async def websocket_receive(self, message):
# message is gone from the call signature, need to inspect the content of text_data and bytes_data
request = json.loads(message['text'])
if request['type'] == 'connection_init':
await self.send(
{
'type': 'websocket.send',
'text': json.dumps({'type': 'connection_ack'})
})
elif request['type'] == 'start':
payload = request.get('payload')
id = request.get('id')
# This part acts like a request, QUESTION: should this be a dict like object that set/get from the self.scope?
message = dict()
message['id'] = id
message['reply_channel'] = self.channel_name
message['scope'] = self.scope
message['groups'] = self.groups
message['dataloaders'] = DataLoaders(get_language())
stream = Subject()
# TODO: Implement weight, can this query run for this user or is it too expensive <10-11-17> #
# TODO: Implement timeout mechanism <10-11-17> #
# result = await SyncToAsync(schema.execute)(self.query, variable_values=self.foovar, allow_subscriptions=True, **self.kwargs)
result = asyncio.wait_for(
schema.execute(
payload['query'],
operation_name=request['operationName'],
variable_values=payload['variables'],
executor=concurrent.futures.ThreadPoolExecutor(),
root_value=Observable.create(stream).share(),
allow_subscriptions=True,
**{'context_value': message})
)
if isinstance(result, rx.Observable):
result = result.publish().auto_connect()
result.subscribe(functools.partial(self._send_result, id))
self.subscriptions[id] = stream
else:
self._send_result(id, result)
elif request['type'] == 'stop':
operationName = request.get('operationName')
await self.channel_layer.group_discard(operationName, self.channel_name)
async def websocket_disconnect(self):
for group in self.groups.keys():
await self.channel_layer.group_discard(group, self.channel_name)
await self.send({
"type": "websocket.close", "code": 1000
})
raise StopConsumer()
async def _send_result(self, id, result):
errors = result.errors
await self.send({
'type': 'websocket.send',
'text': json.dumps({
'id': id,
'type': 'data',
'payload': {
'data': result.data,
'errors': list(map(str, errors)) if errors else None,
}
})
})
def model_changed(self, message):
forwhat = message['models']
...
ok Im a bit closer to having an async consumer.. however there is one small issue. Let me go into detail before you get to the code. currently I am attempting to use an async executor to run the schema .. that however causes the result not to return an observable but a <graphql.execution.base.ExecutionResult object at 0x7f901c35f488>
the errors property returns this
[RuntimeError('This event loop is already running',)]
so I would expect that the schema not to execute since these error should mark the end of the execution and nothing should be resolve .. however some strange behaviour occours .. I set a brake point inside of the resolve method for the subscription and after the schema executes and is returned that brake point is hit
> /app/apple/graphquery/consumers.py(74)websocket_receive()
-> result = schema.execute(
(Pdb) c
> /app/apple/graphquery/consumers.py(84)websocket_receive()
-> if isinstance(result, rx.Observable):
(Pdb) result
<graphql.execution.base.ExecutionResult object at 0x7f901c35f488>
(Pdb) result.errors
[RuntimeError('This event loop is already running',)]
(Pdb) c
> /app/apple/product/schema.py(179)resolve_sub_product()
-> await make_sub(info, input.get('product'))
(Pdb) ll
177 async def resolve_sub_product(self, info, **input):
178 __import__('pdb').set_trace()
179 -> await make_sub(info, input.get('product'))
180 name = ProductType._meta.model.__class__.__name__
181
182 stream = info.root_value
183 return stream.map(lambda message: self.next(message, info, **input))
(Pdb)
here is what the updated consumer looks like
from django.utils.translation import get_language
from asgiref.sync import SyncToAsync
from rx.subjects import Subject
from rx.concurrency import AsyncIOScheduler
from channels.consumer import AsyncConsumer
from channels.exceptions import StopConsumer
from graphql.execution.executors.asyncio import AsyncioExecutor
from graphql import graphql
from graphene_django.settings import graphene_settings as gqsettings
from .views import DataLoaders
import rx
import json
import json
import functools
import asyncio
import functools
import asyncio
schema = gqsettings.SCHEMA
class GQLConsumer(AsyncConsumer):
# NOTE: asgiref.SyncToAsync for django ORM
def __init__(self, scope):
super().__init__(scope)
# keeps a record of streams
self.subscriptions = {}
# keeps a record of groups the connection belongs to
self.groups = {}
# self.executor = AsyncioExecutor(loop=asyncio.get_event_loop())
# @allowed_hosts_only
async def websocket_connect(self, event):
# message.reply_channel.send({'accept': True, 'text': json.dumps({'type': 'connection_ack'})})
# TODO: This might need some security, auth users or apps only <10-11-17> #
await self.send({
"type": "websocket.accept",
"subprotocol": "graphql-ws",
})
async def websocket_receive(self, message):
# message is gone from the call signature, need to inspect the content of text_data and bytes_data
request = json.loads(message['text'])
if request['type'] == 'connection_init':
await self.send(
{
'type': 'websocket.send',
'text': json.dumps({'type': 'connection_ack'})
})
elif request['type'] == 'start':
payload = request.get('payload')
id = request.get('id')
# This part acts like a request, QUESTION: should this be a dict like object that set/get from the self.scope?
message = dict()
message['id'] = id
message['reply_channel'] = self.channel_name
message['scope'] = self.scope
message['subscribe'] = functools.partial(self._subscribe, id)
message['dataloaders'] = DataLoaders(get_language())
stream = Subject()
# TODO: Implement weight, can this query run for this user or is it too expensive <10-11-17> #
# TODO: Implement timeout mechanism <10-11-17> #
# result = await SyncToAsync(schema.execute)(self.query, variable_values=self.foovar, allow_subscriptions=True, **self.kwargs)
__import__('pdb').set_trace()
result = schema.execute(
payload['query'],
variable_values=payload['variables'],
root_value=rx.Observable.create(stream).share(),
allow_subscriptions=True,
executor=AsyncioExecutor(loop=asyncio.get_event_loop()),
**{'context_value': message})
isinstance(result, rx.Observable)
__import__('pdb').set_trace()
if isinstance(result, rx.Observable):
result = result.publish().auto_connect()
result.subscribe(functools.partial(self._send_result, id))
self.subscriptions[id] = stream
else:
self._send_result(id, result)
elif request['type'] == 'stop':
await _unsubscribe(request)
async def websocket_disconnect(self, *args, **kwargs):
for group in self.groups.keys():
await self.channel_layer.group_discard(group, self.channel_name)
await self.send({
"type": "websocket.close", "code": 1000
})
raise StopConsumer()
def _subscribe(self, id, gp_name):
group = self.groups.setdefault(gp_name, set())
self.groups[gp_name].add(id)
async def _unsubscribe(self, request):
operationName = request.get('operationName')
await self.channel_layer.group_discard(operationName, self.channel_name)
id = request.get('id')
del self.subscriptions[id]
async def _send_result(self, id, result):
errors = result.errors
await self.send({
'type': 'websocket.send',
'text': json.dumps({
'id': id,
'type': 'data',
'payload': {
'data': result.data,
'errors': list(map(str, errors)) if errors else None,
}
})
})
async def model_changed(self, message):
__import__('pdb').set_trace()
gp_name = message['gp_name']
pk = message['pk']
for id in self.groups.get(gp_name, []):
stream = self.subscriptions.get(id)
if not stream:
continue
stream.on_next((pk, model))
The misterous async subscription that executes after it fails to execute ...
class ProductSubscritption(object):
"""test"""
sub_product = graphene.Field(
ProductType,
description='subscribe to updated product',
product=graphene.ID())
async def resolve_sub_product(self, info, **input):
__import__('pdb').set_trace()
await make_sub(info, input.get('product'))
name = ProductType._meta.model.__class__.__name__
stream = info.root_value
return stream.map(lambda message: self.next(message, info, **input))
@classmethod
def next(cls, message, info, **input):
# here the message comes from the stream but the info and **input come frome
# subscribing to the next method to the stream
inst = relay.Node.get_node_from_global_id(info, input.get('product'))
return inst
and the signal register, this doesn't seem to be related to the issue im having but just in case you would like to look at it
from asgiref.sync import AsyncToSync
from channels.layers import get_channel_layer
from graphene import relay
from graphql_relay import from_global_id as fgi # , to_global_id
from promise.dataloader import DataLoader
from promise import Promise
import json
channel_layer = get_channel_layer()
group_send = AsyncToSync(channel_layer.group_send)
def send_update(sender, instance, created, attr='pk', *args, **kwargs):
value = str(getattr(instance, attr))
payload = {
'type': 'model.changed',
'pk': instance.pk,
'attr': value,
'created': created,
}
# if the instance was created, send to channels listening for created
if created:
group_send(
'gqp.{0}-add'.format(str.lower(instance.__class__.__name__)),
payload
)
return
# pk is prefered if available over the provided attr
if hasattr(instance, 'pk'):
name = getattr(instance, 'pk')
else:
name = value
gp_name = 'gqp.{0}-updated.{1}'.format(str.lower(instance.__class__.__name__), name)
payload['gp_name'] = gp_name
# If the item was updated, send signal to channels listening for updates
group_send(
gp_name,
payload
)
async def make_sub(info, gid):
inst = relay.Node.get_node_from_global_id(info, gid)
try:
gp_name = 'gqp.{0}-updated.{1}'.format(str.lower(inst.__class__.__name__), inst.pk)
# Group(gp_name).add(info.context.reply_channel)
# info.context.channel_session['Groups'] = ','.join( (gp_name, info.context.channel_session['Groups']))
await channel_layer.group_add(
gp_name,
info.context['reply_channel']
)
subscribe = info.context['subscribe']
if subscribe:
subscribe(gp_name)
except:
pass
here im going to tag some people, @tricoder42 @prokher @hballard @syrusakbary
getting closer : https://github.com/graphql-python/graphql-core/issues/63#issuecomment-396017113
I figured it out, just needed this line to get async observable working
result.subscribe(lambda t: loop.create_task(self._send_result( id, t)))
currently using a work around for this issue https://github.com/graphql-python/graphene-django/issues/452
This is the work around, instead of using subscriptions directly .. I use them to trigger regular queries
subscribeToNewMessages() {
this.subscription = this.props.data.subscribeToMore({
document: gql`
subscription ChangeInProduct($id: ID!) {
subProduct(product: $id) {
title
}
}
`,
variables: { id: this.props.match.params ? this.props.match.params.id : null },
updateQuery: (prev, { subscriptionData }) => {
if (!subscriptionData.data) {
return prev;
}
console.log('interesting', prev, subscriptionData);
this.props.data.refetch()
return prev;
},
});
}
JFYI: We have eventually published our domestic subscriptions implementation DjangoChannelsGraphqlWs. Actually we implemented a simple WebSocket-based GraphQL server implemented on Django Channels. Subscriptions are implemented in the Graphene-like style. The work is still in progress, but probably someone will find it useful.
Ahhh! i have been waiting for this. @prokher i will definitely check it out.
@Musbell be careful, it is not ready for production yet, currently we are working hard to improve (add asynchronous message handling, ordering/serialization, examples, etc), and we are happy to receive any feedback you have.
@prokher thanks for the notice. :+1:
Subscriptions should be in Graphene2.0+ @syrusakbary mentioned it in this tweet https://twitter.com/syrusakbary/status/923325568157859840?lang=en I can't find the function in the repo or any documentation, let's hope we get an update soon
Subscriptions introduced in v2! https://github.com/graphql-python/graphene/releases/tag/v2.0.0
@mvanlonden correct me if im wrong but tag v2.0.0 has been out for a while .. And i can see no commit mentioning subscriptions. in commits or in the tags that follow.
Could you provide an example implementation of a subscription. would one need to use channels in django for this .. ?
@japrogramer it is not documented well/at all but documentation needs are tracked here. This commit added support for subscriptions.
https://github.com/eamigo86/graphene-django-subscriptions is an example of using graphene-django with subscriptions although we'd like to implement the functionality natively in graphene-django
Mind submitting a PR with documentation for subscriptions?
@mvanlonden that has been possible for a while, i thought your announcement was about making the message delivered via graphene.
Not sure this issue should be closed, since the apollo integration still takes some set up. even with a simple lib.
case and point.
at any rate, here is another successful attempt, This time i wrote it for aiohttp
from graphql import GraphQLError
from aiograph.redistools.pubsub import subscribe, reader
from aiograph.redistools.utils import create_redis
from graphene.types import generic
import graphene
import asyncio
class TestType(graphene.ObjectType):
name = "test"
msg = graphene.String()
class Query(object):
test_field = graphene.Field(TestType, msg=graphene.String())
async def resolve_test_field(self, info, **args):
msg = args.get('msg', None)
return TestType(msg=msg)
class MessageMutation(graphene.Mutation):
status = graphene.Int()
errors = generic.GenericScalar()
class Arguments:
channel = graphene.String(required=True)
msg = graphene.String(required=True)
@staticmethod
async def mutate(root, info, **input):
try:
redis = await create_redis()
await redis.publish(
input.get('channel'),
input.get('msg'))
return MessageMutation(status=200, errors=None)
except Exception as e:
breakpoint()
raise GraphQLError('An Error occoured.')
class TestSubscription(object):
test_updated = graphene.Field(TestType, channel=graphene.String())
async def resolve_test_updated(root, info, **input):
loop = asyncio.get_event_loop()
def next(message):
return TestType(msg=message)
redis = await create_redis()
channel = await subscribe(redis, input.get('channel', 'default'))
async for i in reader(channel[0]):
# Here we yield the actual object for the TestType
yield next(i)
class Mutations(object):
send_message = MessageMutation.Field()
from contextlib import suppress
from aiohttp_session import get_session
from aiohttp import web, WSMsgType
from graphql.execution.executors.asyncio import AsyncioExecutor
from graphql import graphql
from config.settings import settings
from config.schema import schema
import asyncio
import functools
import json
import rx
from .utils import format_response, run_aiograph
# import the logging library
import logging
logger = logging.getLogger('asyncio')
async def graph_handler(request):
session = await get_session(request)
last_visit = session['last_visit'] if 'last_visit' in session else None
if request.content_type in ('application/graphql', 'application/json'):
payload = json.loads(await request.text())
with suppress(NameError):
message = dict()
message['request'] = request
result = await run_aiograph(
payload,
executor=AsyncioExecutor(loop=asyncio.get_event_loop()),
allow_subscriptions=False,
return_promise=True,
**{'context_value': message}
)
response = format_response(result)
return web.json_response(response)
class WebSocket(web.View):
def __init__(self, *args, **kwargs):
self.loop = asyncio.get_event_loop()
return super().__init__(*args, **kwargs)
async def get(self):
ws = web.WebSocketResponse()
await ws.prepare(self.request)
session = await get_session(self.request)
if 'WebSockets' in self.request.app:
self.request.app['websockets'].append(ws)
else:
self.request.app['websockets'] = ws
async for msg in ws:
if msg.type == WSMsgType.text:
data = json.loads(msg.data)
if data['type'] == 'close':
await ws.close()
else:
if data['type'] == 'connection_init':
logger.debug('connection_init')
await ws.send_json(
{ 'type': 'websocket.send',
'text': json.dumps({'type': 'connection_ack'})}
)
if data['type'] == 'start':
await self._start(ws, data)
if data['type'] == 'stop':
await ws.send_str(str(data))
elif msg.type == WSMsgType.error:
log.debug(
'ws connection closed with exception %s'\
% ws.exception())
if 'WebSockets' in self.request.app:
self.request.app['websockets'].remove(ws)
logger.debug('websocket connection closed')
return ws
async def _start(self, ws, data):
message = dict()
message['request'] = self.request
result = await run_aiograph(
data,
allow_subscriptions=True,
return_promise=True,
**{'context_value': message}
)
if isinstance(result, rx.Observable):
logger.debug('result is an Observable')
result.subscribe(lambda t: self.loop.create_task(self._send_result(ws, t)))
else:
response = format_response(result)
await ws.send_json(response)
async def _send_result(self, ws, result):
await ws.send_json(format_response(result))
Since this was a quick test this sufficed.
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Humble begginings</title>
<meta name="description" content="Test bed">
<meta name="author" content="me">
</head>
<body>
<script language="JavaScript">
socket = new WebSocket("ws://" + window.location.host + "/wsgql");
socket.onmessage = function(e) {
alert(JSON.parse(e.data));
}
socket.onopen = function(){
alert('Connection to server started')
}
socket.onclose = function(event){
if(event.wasClean){
alert('Clean connection end')
}else{
alert('Connection broken')
}
};
socket.onerror = function(error){
alert(error);
}
socket.onopen = function(){
alert('Connection to server started')
var msg = {
type: 'start',
query:
'subscription { testUpdated(channel:"test"){ msg }}',
}
socket.send(JSON.stringify(msg));
}
console.log('about to send message.')
</script>
</body>
</html>
@japrogramer: wondering if it makes sense ditching graphene subscriptions altogether for the sake of apollo client integration if there is no out-of-the box support for it.
Say, most minimal setup: Django users running on top of Postgres could simply run a standalone subscriptions-transport-ws with graphql-postgres-subscriptions, in graphene endpoint mutations we just directly use postgres's pubsub with NOTIFY pinAdded; '{"pinAdded":{"title":"wow",", "id":0}}'
Personally, I wouldn't consider graphql-postgres-subscriptions a suitable replacement for subscription functionality in Graphene. Obviously different people will have different needs, but if I'm building a GraphQL API for a Python project, divorcing my subscriptions backend from the main app introduces some undesirable complications. Unless you're just providing full, open access to your database, it would mean having to re-implement whatever authentication and permissions are involved in accessing your database. If you're using Django, for example, you no longer have access to the authentication backend and would need to re-implement it in Node, at which point why are you bothering with Python/Django/Graphene at all?
@joshourisman: this makes a good point, thanks. (I just had a thought of passing temporary subscription tokens to clients on login for things they can subscribe to, then verifying them upon subscription on subscriptions-transport-ws
side)
Basically, I have a legacy (somehow large) django app. I'm investigating into graphene integration on top of that and hooking up apollo client with angular / native iOS / native Android clients with hopes of reducing amount of logic needed to be written to support the apis. And this seemingly build-in subscription thing into Apollo clients across each platform (haven't used any graphql before today pretty much) looks promising for the first glance.
@ambientlight you can implement subscriptions with django-channels and apollo, ive done it, its not trivial. Here is an example implementation that is similar to mine, in the way it delivers the message to the client. https://github.com/datadvance/DjangoChannelsGraphqlWs
Does anyone know when subscriptions will be implemented natively in graphene?
@dspacejs because graphene is very separated from the common python frameworks .. it is difficult to implement a delivery system for all of them in this package.
However i would like to see packages like graphene-django and graphene-flask to implement their own handler/view that works for their platform .. following the language API closely enough so that it works with apollo. That way we don't have users trying to roll their own implementation every time someone wants subscriptions And all the eyes of the community are in one implementation for the best support.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
I didn't get, is there a native subscription support in Graphene or not. Shall I drop my https://github.com/datadvance/DjangoChannelsGraphqlWs or not yet?
@prokher please don't drop the only one I could get working 😄
Is there any updates on this subject?
Hi everyone! I just released a GraphQL subscriptions implementation for Graphene + Django based on an internal implementation we’ve been using in production for the last 6 months at Jetpack (https://tryjetpack.com/). It builds on @tricoder42's gist implementation and takes inspiration from the great work that @japrogramer, @eamigo86 and @prokher have done. Since this is the canonical Graphene subscriptions issue I thought I would post it here.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Any updates? (Trying to keep the discussion alive.) :)
Hello @syrusakbary.
Thanks for all your hard work on graphene and graphql-python. Awesome library!!
I posted this on #393 earlier this week...reposting here so it's easier to discover.
I implemented a port of the apollo graphql subscriptions modules (graphql-subscriptions and subscriptions-transport-ws) for graphene / python. They work w/ apollo-client.
It is here.
Same basic api as the Apollo modules. It is still very rough...but works so far, based on my limited internal testing. Uses redis-py, gevent-websockets, and syrusakbary/promises. I was going to add a simple example app, setup.py for easier install, and more info to the readme w/ the API, in the next few days. A brief example is below. Only works on python2 for now. My plan is to start working on tests as well. I figured I'd go ahead and share in this early stage in case anybody is interested...
I'm very new to open source, so any critiques or pull requests are welcome.
Simple example:
Server (using Flask and Flask-Sockets):
Of course on the server you have to "publish" each time you have a mutation (in this case to a redis channel). That could look something like this (using graphene / sql-alchemy):
Client (using react-apollo client):