[Feature request] Dynamic subscription filtering through query params. #179

AndreaMascoli commented 1 year ago

Describe the bug I want to filter the queryset of the consumer, in order to observe a subset of instances of a model and print in console the message containing the instance/s values after every change of them. Iwant to pass filter in URL Websocket connection.

To Reproduce Steps to reproduce the behavior:

  1. Install all packeges for establish a websocket connecion and implement a consumer
  2. Create a model, a serializer, a viewset
  3. Create the consumer of that model
  4. Define routes
  5. Implement a html page which report in real time all changes of some instances

Expected behavior In the Url for the Websocket connection i will insert some query parameters. The consumer will observe only the instances which match their values with the filters and the consumer will send a message to the client containing the changed instances informations.

Additional context I can't find something that works like this in the docs

hishnash commented 1 year ago

Due to how django signals work it is not possible to setup an observation ad hock.

What you can do is figure out in advance what types of data filtering you will need. Best way to think of it is how you would group your data for each model.

Eg a user might want to subscribe to all posts on a social network that include thier username. This implies that when a post is create/updated for each username mention in the post a message needs to be broadcast channel group. If the user in online they can subscribe to the group for thier username and thus will get a message when someone else creates a post that mentions them.

Of cource a single model can map to many such groups, the same social media post would likly also need to broadcast on a group for each hashtag that is in the post.

The key thing to remember here is that in code you need to define this mapping from Model instance to group. What you can do when a web socket connection (in the url or by sending a ws message) you can then dynamically subscribe to differnt groups but you cant create new groups that you did not consider in code.

If your use case does not expect lots of users at once then you can do a filter on the consuerl side and have the consumer skip updates if they do not match the filter. This would give you more dynamic filtering but any DB filtering on the consumer side will be very costly if you have lots of clients connected as each consumer will need to do that query every time there is an update.

Could you give a more concrete example of your data model that you want to query and what sort of queries you need to do?

AndreaMascoli commented 1 year ago

Hello hishnash, thank you for your answer. Actually my case is very basic : i have a custom class Dataset, this has many fields like 'description', 'status'... I've implemented the consumer with djangochannelsrestframework. The result that i want to achieve is filtering the Dataset instances, observed by the consumer, through some quey params (for example id, status, licence and project) in order to send the message containing the state if those instances everytime they change value. Until now i've implemented, with model_observer annotation, a consumer which oberves every instance and the filtering is operated onto the messages that the client would receive. I was looking for, if exist, a filtering mechanism onto the consumer queryset rather than the messages. Here is an implementation of the consumer:

`class DatasetConsumer(ListModelMixin, GenericAsyncAPIConsumer): queryset = Dataset.objects.all() serializer_class = DatasetSerializer permission_classes = (permissions.BasePermission,)

async def connect(self, **kwargs):
    query_string = self.scope['query_string'].decode()
    params = parse_qs(query_string)
    self.param_id = params.get('id', [None])[0]
    self.param_project = params.get('project', [None])[0]
    self.param_status = params.get('status', [None])[0]
    self.param_data = {
        'id': self.param_id,
        'project': self.param_project,
        'status': self.param_status,
    await self.model_change.subscribe()
    await super().connect()

async def model_change(self, message, observer=None, **kwargs):
    model_instance = message['data']

    message_data = {
        'id': str(model_instance['id']),
        'project': str(model_instance['project']),
        'status': str(model_instance['status']),

    if (not self.param_data.get('id') or message_data.get('id') == self.param_data.get('id')) and \
        (not self.param_data.get('project') or message_data.get('project') == self.param_data.get('project')) and \
            (not self.param_data.get('status') or message_data.get('status') == self.param_data.get('status')):
        await self.send_json(message)

def model_serialize(self, instance, action, **kwargs):
    return dict(data=DatasetSerializer(instance=instance).data, action=action.value)`
hishnash commented 1 year ago

You solution is a valid method for doing this, if you have LOTs and LOTs of connected clients however there is a better solution by suing the groups.

What you can do is create a group for the each of the things you could query for. Then whenever the object changes the relevant groups are informed.

def model_change(self, instance: models.Classroom, **kwargs):
      # this block of code is called very often *DO NOT make DB QUERIES HERE*
      yield f'-pk__{}'
      yield f'-project__{instance.project_id}'   # for relationships use the `_id` field to avoid extra db queries 
      yield f'-status__{instance.status}' 

def model_change(self, dataset=None, project=None, status=None **kwargs):
      # This is called when you subscribe/unsubscribe
      if project is not None:
          yield f'-project__{}'
      if dataset is not None:
          yield f'-pk__{}'
      if status is not None:
         yield f'-status__{status}'

And then in your connect method we need to subscribe:

async def connect(self, **kwargs):
    query_string = self.scope['query_string'].decode()
    params = parse_qs(query_string)
    dataset_pk = params.get('id', [None])[0]
    project_pk = params.get('project', [None])[0]
    status = params.get('status', [None])[0]

    #TODO: catch error and consider what to do if the object cant be found in the db
    dataset = None
    if dataset_pk is not None:
        dataset  = await database_sync_to_async(self.get_object)(pk=pk)

    project = None
    if project_ok is not None:
        project =  await database_sync_to_async(Project.objects.get)(pk=pk)

    await self.model_change.subscribe(dataset= dataset, project=project, status=status)
    await super().connect()

With this done you can remove the filtering form your main handler as it should only be called for objects in these groups.

Note if you use this method. If a user is subscribed to status A and a project with status A is updated to have status B then the user will get a message saying the dataset as been deleted (since in that users eyes the dataset no longer exists). This has some issues when a single user is subscribe to multiple groups at once as a DataSet could be udated to have a new status and at the same time updated to belong to a project that the user is subscribed to.. the result of this will mean 2 messages are sent

hishnash commented 1 year ago

After waking up this morning I think you should do something a little bit different.

Instead of subscribing to a group for each project and a group for each status. If the user provides both subscribe to 1 group with both and if they just provide one of either subscribe that respective group. This way you will get the correct filtering as you are using and in your filter query above (not or).

so the changes would be:

def model_change(self, instance: models.Classroom, **kwargs):
      # this block of code is called very often *DO NOT make DB QUERIES HERE*
      yield f'-pk__{}'
      yield f'-project__{instance.project_id}'   # for relationships use the `_id` field to avoid extra db queries 
      yield f'-status__{instance.status}' 
      yield f'-pk__{}__status__{instance.status}' 
      yield f'-pk__{}__ project__{instance.project__id}' 
      yield f'-pk__{}__ project__{instance.project__id}__status__{instance.status}'
      yield f'-project__{instance.project_id}__status__{instance.status}'

def model_change(self, dataset=None, project=None, status=None **kwargs):
     if dataset is not None and project is not None and status is not none:
         yield f'-pk__{}__ project__{}__status__{status}'
     else if dataset is not None and project is not None:
          yield f'-pk__{}__ project__{}'
     else if dataset is not None and status is not None:
          yield f'-pk__{}__status__{status}'
      else if status is not None and project is not None:
          yield f'-project__{}__status__{status}''
      else if status is not None:
          yield f'-status__{status}'
      else if project is not None:
          yield f'-project__{}'
      else if dataset is not None:
          yield f'-pk__{}' 

This way the notifications actions (create, update or delete) will be correct.

If you use subscribes to just a id then they will be notified if an object with that id is updated, or deleted.

if a user subscribed to just a status then they will get created notifications whenever an object is created with that status or and existing dataset is updated to now include that status and they will get updated notifications whenever such a dataset is updated and deleted if the dataset is deleted or if its status changes to a differs status (thus being deleted from this channel group of subscription)

if a user supplies a project it is the same as status.

if a user supplies project and status then these are && together

and a user supplies project, status and id then this are all && tougher.


Is this what you expected?

AndreaMascoli commented 1 year ago

Thank you very much for your support, this implementation is what i expected. However i have one last question. Considering this consumer code, is there the possibility to implement also the mechanism by which if all filters are not provided (so they are None), the consumer will observe the changes for all instances, so i will observe all the queryset of the entity Dataset without filter it?

hishnash commented 1 year ago

yes you can just yield another group yield '-all' in the groups_for_signal and in groups_for_consumer if the user has not provided any query then you can yield that same group back.

AndreaMascoli commented 1 year ago

Thank you very much for your support. :)