Closed predmond closed 5 years ago
That would depend on what kind of subscription, for example long-polling or WebSocket. It also depend on your backend, for example Django, with django channels, could handle subscriptions by broadcasting to connected sockets from a post_save signal.
The new databinding features in django-channels look ideal for graphql subscriptions: https://channels.readthedocs.io/en/stable/binding.html
I'd love to see an example of subscriptions in graphene using channels (or any other method) :)
I'd guess it's pretty straight forward to code a model related subscription. I'm working with Django, and plan to use Django Channel: my suggestions and answers are opinionated that way.
Getting inspired by mutation from sources, one could push down the data to customer quite easily. In case of Mutation, mutate is called is the resolver. I guess a general subscribe function could be written to "hookup" the customer channel on an "subscription specific channel", it self hooked up on the "model channel". Using a post_save, writting down a custom save method or binding could do the Job to propagate the data down to clients.
I don't know how to solve two stuff though:
For the first point, I guess I could use caching engine and a 3 layer structure to push data to my channel. Like: Customer Channel hooked up to a Query Channel, that his hooked up to the model channel. The query used in the Query Channel being store in cache. It seems non trivial bu feasible.
For the second point, only a cache based solution seems viable, storing filters in a dict, stored in cache. But that's lot of work.
Any suggestion on those two questions is welcome!
I'll give a basic architecture a try, and create share repo' to give some feedback.
@Jufik Any updates?
I implemented a port of the apollo graphql subscriptions modules (graphql-subscriptions and subscriptions-transport-ws) for graphene / python. They work w/ apollo-client.
It is here.
Same basic api. It is still very rough...but works so far, based on my limited testing. Uses redis-py, gevent-websockets, and syrusakbary/promises. It is still missing a few of the server options and keep-alive messages, but those are pretty easy to implement and I'll do that in the next few days. I was going to add a simple example app too. Some of it is below. Only works on python2 for now and it needs tests, setup.py, etc. I figured I'd go ahead and share in case anybody is interested...
Simple example:
Server (using Flask and Flask-Sockets):
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_sockets import Sockets
from .subscription_manager import SubscriptionManager, RedisPubsub
from .subscription_transport_ws import ApolloSubscriptionServer
app = Flask(__name__)
sockets = Sockets(app)
pubsub = RedisPubsub()
schema = graphene.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription
)
subscription_mgr = SubscriptionManager(schema, pubsub)
@sockets.route('/socket')
def socket_channel(websocket):
subscription_server = ApolloSubscriptionServer(subscription_mgr, websocket)
subscription_server.handle()
return []
if __name__ == "__main__":
from geventwebsocket import WebSocketServer
server = WebSocketServer(('', 5000), app)
print ' Serving at host 0.0.0.0:5000...\n'
server.serve_forever()
Of course on the server you have to "publish" each time you have a mutation (in this case to a redis channel). That could look something like this (using graphene / sql-alchemy):
class Subscription(graphene.ObjectType):
users = graphene_sqlalchemy.SQLAlchemyConnectionField(
User,
active=graphene.Boolean()
)
def resolve_users(self, args, context, info):
query = User.get_query(context)
return query.filter_by(id=info.root_value.get('id'))
class AddUser(graphene.ClientIDMutation):
class Input:
username = graphene.String(required=True)
email = graphene.String()
ok = graphene.Boolean()
user = graphene.Field(lambda: User)
@classmethod
def mutate_and_get_payload(cls, args, context, info):
_input = args.copy()
del _input['clientMutationId']
new_user = UserModel(**_input)
db.session.add(new_user)
db.session.commit()
ok = True
if pubsub.subscriptions:
pubsub.publish('users', new_user.as_dict())
return AddUser(ok=ok, user=new_user)
Client (using react-apollo client):
import React from 'react'
import ReactDOM from 'react-dom'
import { graphql, ApolloProvider } from 'react-apollo'
import gql from 'graphql-tag'
import ApolloClient, { createNetworkInterface } from 'apollo-client'
import { SubscriptionClient, addGraphQLSubscriptions } from 'subscriptions-transport-ws'
import ChatApp from './screens/ChatApp'
import ListBox from '../components/ListBox'
const SUBSCRIPTION_QUERY = gql`
subscription newUsers {
users(active: true) {
edges {
node {
id
username
}
}
}
}
`
const LIST_BOX_QUERY = gql`
query AllUsers {
users(active: true) {
edges {
node {
id
username
}
}
}
}
`
class ChatListBox extends React.Component {
componentWillReceiveProps(newProps) {
if (!newProps.data.loading) {
if (this.subscription) {
return
}
this.subscription = newProps.data.subscribeToMore({
document: SUBSCRIPTION_QUERY,
updateQuery: (previousResult, {subscriptionData}) => {
const newUser = subscriptionData.data.users.edges
const newResult = {
users: {
edges: [
...previousResult.users.edges,
...newUser
]
}
}
return newResult
},
onError: (err) => console.error(err)
})
}
}
render() {
return <ListBox data={this.props.data} />
}
}
const ChatListBoxWithData = graphql(LIST_BOX_QUERY)(ChatListBox)
export default ChatListBoxWithData
const networkInterface = createNetworkInterface({
uri: 'http://localhost:5000/graphql'
})
const wsClient = new SubscriptionClient(`ws://localhost:5000/socket`, {
reconnect: true
})
const networkInterfaceWithSubscriptions = addGraphQLSubscriptions(
networkInterface,
wsClient,
)
const client = new ApolloClient({
dataIdFromObject: o => o.id,
networkInterface: networkInterfaceWithSubscriptions
})
ReactDOM.render(
<ApolloProvider client={client}>
<ChatApp />
</ApolloProvider>,
document.getElementById('root')
)
I'm very new to open source, so any critiques or pull requests are welcome.
@hballard checking it out. But wow, this was one of the major things holding me back.
Cool. Let me know your thoughts. I just pushed an update that added the keep_alive messages and the on_connect, on_disconnect, on_subscribe, and on_unsubscribe optional server setup functions. This should make it mostly the same as the Apollo subscriptions API at this point.
@globophobe django channels cannot do broadcasting with ws https://channels.readthedocs.io/en/stable/concepts.html#groups
@japrogramer You're right. There's an extra step. The client would need to be added to a group. I don't have time (nor ever will) to explore this. Personally waiting for, https://github.com/encode/uvicorn and https://github.com/encode/apistar to bring good things to Django, or obviate my need for it.
For anyone interested this is how i implemented subscriptions with channels. https://github.com/graphql-python/graphene/pull/500#issuecomment-325560994
I just collaborated with @IlyaRadinsky on a graphene/Tornado/subscriptions integration with websockets that requires one line of code to get your graphene schema + subscriptions up and available as an API. Here's a link to his original repo, which has incorporated my changes as well: https://github.com/IlyaRadinsky/tornadoql
@miketout awesome, lots of interesting techniques and code there, will definitely be taking a look.
I'm about to start digging into subscription support probably tomorrow, but I'm looking at the code here (graphql-python test code)
Here's a very simple gist of how to use subscriptions with graphene.
In a nutshell, it's probably easiest to return an rx.Observable
using the pattern in the implementation above, though returning an AsyncIterator
is also allowed. rx
is a dependency of graphene
anyway, meaning that it's not additional requirement you'll need to install, and it works on py2 and py3.
As you're probably using this for a web view, you'll actually want to save that subscriber and store it in a dict so a user can unsubscribe from it in the future. And, you'll probably also want to use websockets, or socket.io, etc. so you can actually push the subscription data out instead of appending it to a list as I've done on that same line.
@dfee any ideas on how to use django signals with observables ..?
I was thinking of doing the same thing you are doing. take a look here
subject = rx.subjects.Subject()
# and in the signal consumer have
@receiver(post_save, sender=Model)
def send_update_product(sender, instance, created, *args, **kwargs):
# I think I might be misunderstanding where I should put this line
subject.on_next((instance.pk, created))
19 def send(result, message):
18 data = result.data
17 message.reply_channel.send(
16 {
15 'text': str({'data': json.loads(json.dumps(data))})
14 })
13
12
11 @channel_session
10 def ws_GQLData(message):
9 clean = json.loads(message.content['text'])
8 query = clean.get('query')
7 foovar = clean.get('variables')
6 message.dataloaders = DataLoaders(get_language())
5 kwargs = {'context_value': message}
4 # TODO: Implement weight, can this query run for this user or is it too expensive <10-11-17> #
3 # TODO: Implement timeout mechanism <10-11-17> #
2 result = schema.execute(query, variable_values=foovar, allow_subscriptions=True, **kwargs)
1 if isinstance(result, rx.Observable):
37 class MyObserver(rx.Observer):
1
2 def on_next(self, x):
3 send(x, message)
4
5 def on_error(self, e):
6 ...
7
8 def on_completed(self):
9 ...
10
11 result.subscribe(MyObserver())
12 else:
13 send(result, message)
Problem I see here . is if the signal is fired from a django instance that the user didn't subscribe to the user wont see the message.
at the moment im sending the message in the signal with channel groups, but that doesn't take advantage of observables and the user has to re-run the query to get the info they wanted
@japrogramer I honestly haven't used Django since 1.4 (about 6 years ago), so I don't have too much to offer about specifics.
However, my hunch is that Django's signals
concept and the rx
concept are pretty similar in theory, but quite different in implementation. You'll probably need some glue to connect the rx.Observable
object and the Django signal.
I did notice in your code that you're defining class MyObserver
. If I'm imagining correctly, that you lifted that from the tests here I'd point out two things:
1) you probably don't need to define a class **
2) the test code isn't even using that class. It should be dropped from the file.
** see this note from the rxpy docs:
Most of the time you will not want to go through the verbosity of implementing your own Observer. You can instead pass 1 to 3 lambda arguments to subscribe() specifying the on_next, on_complete, and on_error actions.
from rx import Observable
source = Observable.from_(["Alpha", "Beta", "Gamma", "Delta", "Epsilon"])
source.subscribe(on_next=lambda value: print("Received {0}".format(value)),
on_completed=lambda: print("Done!"),
on_error=lambda error: print("Error Occurred: {0}".format(error))
)
@dfee @japrogramer wanted to share, since I finally have something that seems to be consistently working going. (and yes, I am excited in what feels like fighting this forever)
I have a simple pub mixin i include on any class that I am going to have publish. (This could probably be cleaned up)
class PublisherMixin(object):
@property
def channel_name(self):
return self.cls_name + ':' + str(self.id) if hasattr(self, 'cls_name') else str(self.id)
async def pub(self, payload, routing_key=None):
logger.debug('pub triggered')
loop = asyncio.get_event_loop()
if routing_key is None:
routing_key = self.channel_name
connection = await aio_pika.connect_robust('amqp://guest:guest@localhost/', loop=loop)
channel = await asyncio.ensure_future(connection.channel())
maybe = aio_pika.Message(body=pickle.dumps(payload))
logger.debug(routing_key)
await channel.default_exchange.publish(maybe, routing_key=routing_key)
which means during my mutations i can just call something like 'routing = [ go find the right routing path key] + 'mutationName' asyncio.ensure_future(this_mutated_object_doohickey.pub(payload=this_mutated_object_doohickey, routing_key=routing)
Again, probably more elegant ways to solve, but it's working and for now that feels fan-damn-tastic.
my subscription is just
class TeamSubscription(BaseSubscription):
on_team_updated = Field(TeamType, input=UUID(), resolver=resolve.team_updated_subscription)
easy peasy standard args/query format.
resolver
async def team_updated_subscription(root, info, **args):
loop = asyncio.get_event_loop()
connection = await aio_pika.connect_robust('amqp://guest:guest@localhost/', loop=loop)
channel = await asyncio.ensure_future(connection.channel())
x = args['input']
lazy = LazySwitcher()
p = lazy.project.get(root=root, eid=x, info=info)
q_name = p.channel_name + ':onTeamUpdated'
keh = await channel.declare_queue(q_name, auto_delete=True)
while True:
async for message in keh:
m = pickle.loads(message.body)
with message.process():
yield m
lazy is just a mapping into lazy_callables to get around some circular dependency issues. I have an object that maintains it's own internal mapping so I can avoid circular import hell. Nothing special there.
using the base rabbit-alpine docker
version: '3.3'
services:
roger:
image: rabbitmq:3.7.6-alpine
network_mode: host
container_name: roger
https://github.com/heyrict/cindy-realtime was probably the most useful repository ever for referencing things
(aside, to get the AioHttp subscription server working with middleware and authentication I had to subclass and override quite a bit. However, now that I did it, I'm loving having most of my traffic over websockets *for a hilarious read on subscriptions / etc read about 5 minutes into here )
@ProjectCheshire your solution looks a lot more elegant than what i ended up implementing @dfee I got the observables part down https://github.com/graphql-python/graphene/issues/430#issuecomment-396015394
my solution is so convoluted and the graphql-core package doesn't like that i have loop already running https://github.com/graphql-python/graphene-django/issues/452 it raises an error but doesn't stop thee execution of the subscription.
That leads to some issues that I have found a work around, unfortunately that leaves me with some undesired behavior that lead me to not actually using the subscription directly instead I use it to trigger a refetch on the actual query im inteerested in
this.subscription = this.props.data.subscribeToMore({
document: gql`
subscription ChangeInProduct($id: ID!) {
subProduct(product: $id) {
title
}
}
`,
variables: { id: this.props.match.params ? this.props.match.params.id : null },
updateQuery: (prev, { subscriptionData }) => {
if (!subscriptionData.data) {
return prev;
}
console.log('interesting', prev, subscriptionData);
this.props.data.refetch()
return prev;
},
});
JFYI: We have eventually published our domestic subscriptions implementation DjangoChannelsGraphqlWs. Actually we implemented a simple WebSocket-based GraphQL server implemented on Django Channels. Subscriptions are implemented in the Graphene-like style. The work is still in progress, but probably someone will find it useful.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Disappointing that this was autoclosed as stale when websockets are a big part of the GraphQL spec.
The issue should be open in djago-graphene or flask-graphene not pure graphene .. because of the way python web frameworks work
How can I support subscriptions using graphene?