jaydenwindle / graphene-subscriptions

A plug-and-play GraphQL subscription implementation for Graphene + Django built using Django Channels.
MIT License
116 stars 15 forks source link

Behavior of module-level consumer stream #20

Open ashilen opened 4 years ago

ashilen commented 4 years ago

Hi Jayden. I see that #15 already raises this question and that you have a related pr already open. I just want to confirm that the following behavior is expected, and that I'm not overlooking something.

  1. Some number, let's say 4, websocket connections are open, and therefore 4 GraphqlSubscriptionConsumer instances exist.
  2. They each execute a schema with a subscription that resolves an event that's triggered by a model save signal and piped via the root stream. (So far so good.)
  3. When that signal is fired, each consumer receives a channel notification and calls on_next on the shared stream, and 4 events are pushed on to the stream.
  4. Because the stream is shared by all the consumers and their schemas, each subscription ends up resolving 4, rather than 1, events.

This seems like a bug to me, but since, in your reply to #15 (specifically to this point: "Aside from that, wouldn't stream_fired be called on the same event for each open websocket and cause the event to be delivered extra times to each consumer?"), you don't explicitly address the issue, I'm not certain I'm not missing something. Moreover, in your pr, correct me if I'm misunderstanding, it seems like since groups share a stream, resolvers subscribed to the same group will still receive 1 * (n open consumers publishing to the same group) event objects for each actual event. Am I missing something?

Thank you --

Just-Drue commented 4 years ago

I've noticed this as well, had to to hack my way around this with distinct()

jaydenwindle commented 4 years ago

Hey @ashilen! You're right, that is the current behavior with the global stream setup. In hindsight that was a poor design decision.

7 fixes this by allowing you as a developer to control when groups are subscribed to and streams are created. When you call the subscribe method from inside of a GraphQL resolver, it will do the following:

  1. Add the current consumer to the specified group name
  2. Create a stream that is unique to that resolver / consumer / group name combo and store it on the consumer

Returning the created stream from the GraphQL resolver allows you to use rx operators to filter / map events that get sent to that group before they are returned to the consumer to send to the client

This way, whenever you trigger a subscription using trigger_subscription, the following happens:

  1. trigger_subscription sends an event to the specified group name
  2. All consumers subscribed to that group receive the event and pass it on to the matching stream
  3. The stream is filtered by any rx logic present in the GraphQL consumer
  4. Stream data is sent to the client

So under this new model, a subscription resolver will create a unique stream for each actively subscribed client, and each client stream will receive 1 event object per event that is broadcast to the specified group.

This allows you to further reduce the number of messages each consumer is required to handle by specifying unique group names based on GraphQL arguments when calling subscribe (see the new Model Updated docs for an example of a common use case for this).

The goal of this new design is to try and be as flexible as the pub/sub system used by JS GraphQL Subscriptions implementations, while still allowing you to use the power of rx in your resolvers.

Does that answer your question @ashilen?

@Just-Drue I'd be very interested to hear how you used distinct() to solve this problem :)

Just-Drue commented 4 years ago

@jaydenwindle hash curren't object's properties and call it within the resolve

    def __key(self) -> tuple:
        return self.a, self.b, self.c

    def __hash__(self) -> int:
        return hash(self.__key())

    def __eq__(self, other: 'Model') -> bool:
        if isinstance(other, Model):
            return self.__key() == other.__key()
        return NotImplemented
return root.filter(
    lambda event:
        event.operation == UPDATED and
        isinstance(event.instance, Model)
).map(lambda event: event.instance).distinct(lambda instance: instance.__hash__())