datadvance / DjangoChannelsGraphqlWs

Django Channels based WebSocket GraphQL server with Graphene-like subscriptions
MIT License
281 stars 85 forks source link
django django-channels graphene graphql graphql-server graphql-subscriptions graphql-websocket-server python websockets

Django Channels based WebSocket GraphQL server with Graphene-like subscriptions

PyPI PyPI - Python Version PyPI - Downloads GitHub Release Date Travis CI Build Status GitHub Actions Tests Code style PyPI - License

Features

Installation

pip install django-channels-graphql-ws

Getting started

Create a GraphQL schema using Graphene. Note the MySubscription class.

import channels_graphql_ws
import graphene

class MySubscription(channels_graphql_ws.Subscription):
    """Simple GraphQL subscription."""

    # Leave only latest 64 messages in the server queue.
    notification_queue_limit = 64

    # Subscription payload.
    event = graphene.String()

    class Arguments:
        """That is how subscription arguments are defined."""
        arg1 = graphene.String()
        arg2 = graphene.String()

    @staticmethod
    def subscribe(root, info, arg1, arg2):
        """Called when user subscribes."""

        # Return the list of subscription group names.
        return ["group42"]

    @staticmethod
    def publish(payload, info, arg1, arg2):
        """Called to notify the client."""

        # Here `payload` contains the `payload` from the `broadcast()`
        # invocation (see below). You can return `None` if you wish to
        # suppress the notification to a particular client. For example,
        # this allows to avoid notifications for the actions made by
        # this particular client.

        return MySubscription(event="Something has happened!")

class Query(graphene.ObjectType):
    """Root GraphQL query."""
    # Graphene requires at least one field to be present. Check
    # Graphene docs to see how to define queries.
    value = graphene.String()
    async def resolve_value(self):
        return "test"

class Mutation(graphene.ObjectType):
    """Root GraphQL mutation."""
    # Check Graphene docs to see how to define mutations.
    pass

class Subscription(graphene.ObjectType):
    """Root GraphQL subscription."""
    my_subscription = MySubscription.Field()

graphql_schema = graphene.Schema(
    query=Query,
    mutation=Mutation,
    subscription=Subscription,
)

Make your own WebSocket consumer subclass and set the schema it serves:

class MyGraphqlWsConsumer(channels_graphql_ws.GraphqlWsConsumer):
    """Channels WebSocket consumer which provides GraphQL API."""
    schema = graphql_schema

    # Uncomment to send ping message every 42 seconds.
    # send_ping_every = 42

    # Uncomment to process requests sequentially (useful for tests).
    # strict_ordering = True

    async def on_connect(self, payload):
        """New client connection handler."""
        # You can `raise` from here to reject the connection.
        print("New client connected!")

Setup Django Channels routing:

application = channels.routing.ProtocolTypeRouter({
    "websocket": channels.routing.URLRouter([
        django.urls.path("graphql/", MyGraphqlWsConsumer.as_asgi()),
    ])
})

Notify clients when some event happens using the broadcast() or broadcast_sync() method from the OS thread where there is no running event loop:

MySubscription.broadcast(
    # Subscription group to notify clients in.
    group="group42",
    # Dict delivered to the `publish` method.
    payload={},
)

Notify clients in a coroutine function with async broadcast() or broadcast_async() method:

await MySubscription.broadcast(
    # Subscription group to notify clients in.
    group="group42",
    # Dict delivered to the `publish` method.
    payload={},
)

﹡) In case you are testing your client code by notifying it from the Django Shell, you have to setup a channel layer in order for the two instance of your application. The same applies in production with workers.

You should prefer async resolvers and async middleware over sync ones. Async versions will result in faster code execution. To do DB operations you can use Django 4 asynchronous queries.

Example

You can find simple usage example in the example directory.

Run:

cd example/
# Initialize database.
./manage.py migrate
# Create "user" with password "user".
./manage.py createsuperuser
# Run development server.
./manage.py runserver

Play with the API though the GraphiQL browser at http://127.0.0.1:8000.

You can start with the following GraphQL requests:


# Check there are no messages.
query read { history(chatroom: "kittens") { chatroom text sender }}

# Send a message from your session.
mutation send { sendChatMessage(chatroom: "kittens", text: "Hi all!"){ ok }}

# Check there is a message.
query read { history(chatroom: "kittens") { text sender } }

# Open another browser or a new incognito window (to have another
# session cookie) subscribe to make it wait for events.
subscription s { onNewChatMessage(chatroom: "kittens") { text sender }}

# Send another message from the original window and see how subscription
# triggers in the other one.
mutation send { sendChatMessage(chatroom: "kittens", text: "Something ;-)!"){ ok }}

Details

The channels_graphql_ws module provides the following key classes:

For details check the source code which is thoroughly commented. The docstrings of classes are especially useful.

Since the WebSocket handling is based on the Django Channels and subscriptions are implemented in the Graphene-like style it is recommended to have a look the documentation of these great projects:

The implemented WebSocket-based protocol was taken from the library graphql-ws which is used by the Apollo GraphQL. Check the protocol description for details.

NOTE: Prior to 1.0.0rc7 the library used another protocol: subscription-transport-ws (see the protocol description). In fact Apollo GraphQL has been based on this protocol for years, but eventually has switched to a new one, so we did this as well.

Automatic Django model serialization

The Subscription.broadcast uses Channels groups to deliver a message to the Subscription's publish method. ASGI specification clearly states what can be sent over a channel, and Django models are not in the list. Since it is common to notify clients about Django models changes we manually serialize the payload using MessagePack and hack the process to automatically serialize Django models following the the Django's guide Serializing Django objects.

Execution

Context and scope

The context object (info.context in resolvers) is a SimpleNamespace instance useful to transfer extra data between GraphQL resolvers. The lifetime of info.context corresponds to the lifetime of GraphQL request, so it does not persist content between different queries/mutations/subscriptions. It also contains some useful extras:

Authentication

To enable authentication it is typically enough to wrap your ASGI application into the channels.auth.AuthMiddlewareStack:

application = channels.routing.ProtocolTypeRouter({
    "websocket": channels.auth.AuthMiddlewareStack(
        channels.routing.URLRouter([
            django.urls.path("graphql/", MyGraphqlWsConsumer),
        ])
    ),
})

This gives you a Django user info.context.channels_scope["user"] in all the resolvers. To authenticate user you can create a Login mutation like the following:

class Login(graphene.Mutation, name="LoginPayload"):
    """Login mutation."""

    ok = graphene.Boolean(required=True)

    class Arguments:
        """Login request arguments."""

        username = graphene.String(required=True)
        password = graphene.String(required=True)

    def mutate(self, info, username, password):
        """Login request."""

        # Ask Django to authenticate user.
        user = django.contrib.auth.authenticate(username=username, password=password)
        if user is None:
            return Login(ok=False)

        # Use Channels to login, in other words to put proper data to
        # the session stored in the scope.
        asgiref.sync.async_to_sync(channels.auth.login)(info.context.channels_scope, user)
        # Save the session,cause `channels.auth.login` does not do this.
        info.context.session.save()

        return Login(ok=True)

The authentication is based on the Channels authentication mechanisms. Check the Channels documentation. Also take a look at the example in the example directory.

The Python client

There is the GraphqlWsClient which implements GraphQL client working over the WebSockets. The client needs a transport instance which communicates with the server. Transport is an implementation of the GraphqlWsTransport interface (class must be derived from it). There is the GraphqlWsTransportAiohttp which implements the transport on the AIOHTTP library. Here is an example:

transport = channels_graphql_ws.GraphqlWsTransportAiohttp(
    "ws://backend.endpoint/graphql/", cookies={"sessionid": session_id}
)
client = channels_graphql_ws.GraphqlWsClient(transport)
await client.connect_and_init()
result = await client.execute("query { users { id login email name } }")
users = result["data"]
await client.finalize()

See the GraphqlWsClient class docstring for the details.

The GraphiQL client

The GraphiQL provided by Graphene doesn't connect to your GraphQL endpoint via WebSocket; instead you should use a modified GraphiQL template under graphene/graphiql.html which will take precedence over the one of Graphene. One such modified GraphiQL is provided in the example directory.

Testing

To test GraphQL WebSocket API read the appropriate page in the Channels documentation.

In order to simplify unit testing there is a GraphqlWsTransport implementation based on the Django Channels testing communicator: channels_graphql_ws.testing.GraphqlWsTransport. Check its docstring and take a look at the tests to see how to use it.

Subscription activation confirmation

The original Apollo's protocol does not allow client to know when a subscription activates. This inevitably leads to the race conditions on the client side. Sometimes it is not that crucial, but there are cases when this leads to serious issues. Here is the discussion in the subscriptions-transport-ws tracker.

To solve this problem, there is the GraphqlWsConsumer setting confirm_subscriptions which when set to True will make the consumer issue an additional next message which confirms the subscription activation. Please note, you have to modify the client's code to make it consume this message, otherwise it will be mistakenly considered as the first subscription notification.

To customize the confirmation message itself set the GraphqlWsConsumer setting subscription_confirmation_message. It must be a dictionary with two keys "data" and "errors". By default it is set to {"data": None, "errors": None}.

GraphQL middleware

It is possible to inject middleware into the GraphQL operation processing. For that define middleware setting of your GraphqlWsConsumer subclass, like this:

async def threadpool_for_sync_resolvers(next_middleware, root, info, *args, **kwds):
    """Offload synchronous resolvers to the threadpool.

    This middleware should always be the last in the middlewares calls
    stack and the closest to the real resolver. If this middleware is
    not the last it will check the next middleware to call instead of
    real resolver. The first middleware in the middlewares list will be
    the closest to the resolver.
    """
    # Invoke next middleware.
    if asyncio.iscoroutinefunction(next_middleware):
        result = await next_middleware(root, info, *args, **kwds)
    else:
        result = await asyncio.to_thread(next_middleware, root, info, *args, **kwds)
    return result

class MyGraphqlWsConsumer(channels_graphql_ws.GraphqlWsConsumer):
    ...
    middleware = [threadpool_for_sync_resolvers]

It is recommended to write asynchronous middlewares. But synchronous middlewares are also supported:

def my_middleware(next_middleware, root, info, *args, **kwds):
    """My custom GraphQL middleware."""
    # Invoke next middleware.
    return next_middleware(root, info, *args, **kwds)

class MyGraphqlWsConsumer(channels_graphql_ws.GraphqlWsConsumer):
    ...
    middleware = [my_middleware]

For more information about GraphQL middleware please take a look at the relevant section in the Graphene documentation.

Alternatives

There is a Tomáš Ehrlich GitHubGist GraphQL Subscription with django-channels which this implementation was initially based on.

There is a promising GraphQL WS library by the Graphene authors. In particular this pull request gives a hope that there will be native Graphene implementation of the WebSocket transport with subscriptions one day.

Development

Bootstrap

A reminder of how to setup an environment for the development.

  1. Install PyEnv to be able to work with many Python versions at once PyEnv→Installation.

  2. Install Python versions needed. The command should be executed in the project's directory:

    $ pyenv local | xargs -L1 pyenv install
  3. Check that pyenv works correctly. The command:

    $ pyenv versions

    should show python versions enlisted in .python-version. If everything is set up correctly pyenv will switch version of python when you enter and leave the project's directory. Inside the directory pyenv which python should show you a python installed in pyenv, outside the dir it should be the system python.

  4. Install Poetry (https://python-poetry.org/docs/#installation).

    $ pipx install poetry
  5. Create local virtualenv in .venv, install all project dependencies (from pyproject.toml) except the project itself.

    $ poetry install --no-root
  6. Activate virtualenv There are options:

    • With Poetry:
      $ poetry shell
    • Manually:
      $ source .venv/bin/activate
    • With VS Code: Choose .venv with "Python: Select interpreter" and reopen the terminal.
  7. Upgrade Pip:

    $ pip install --upgrade pip

Use:

Code style: black

Where to start reading the code

The code is inherently complex because it glues two rather different libraries/frameworks Channels and Graphene. You might need some time to dive into. Here are some quick insights to help you to get on track.

The main classes are GraphqlWsConsumer and Subscription. The former one is a Channels consumer which instantiates each time a WebSocket connection establishes. User (of the library) subclasses it and tunes settings in the successor class. The latter is from the Graphene world. Both classes are tightly coupled. When client subscribes an instance of GraphqlWsConsumer subclass holding the WebSocket connection passes to the Subscription.

To better dive in it is useful to understand in general terms how regular request are handled. When server receives JSON from the client, the GraphqlWsConsumer.receive_json method is called by Channels routines. Then the request passes to the _on_gql_subscribe method which handles GraphQL message "SUBSCRIBE". Most magic happens there.

Running tests

A reminder of how to run tests.

Making release

A reminder of how to make and publish a new release.

  1. Merge all changes to the master branch and switch to it.
  2. Update version: poetry version minor.
  3. Update CHANGELOG.md.
  4. Update README.md (if needed).
  5. Commit changes made above.
  6. Git tag: git tag vX.X.X && git push --tags.
  7. Publish release to PyPI: poetry publish --build.
  8. Update release notes on GitHub.

Contributing

This project is developed and maintained by DATADVANCE LLC. Please submit an issue if you have any questions or want to suggest an improvement.

Acknowledgements

This work is supported by the Russian Foundation for Basic Research (project No. 15-29-07043).