NilCoalescing / djangochannelsrestframework

A Rest-framework for websockets using Django channels-v4
https://djangochannelsrestframework.readthedocs.io/en/latest/
MIT License
603 stars 84 forks source link

[BUG] ObserverModelInstanceMixin not observable inside Celery task #197

Closed W0rtX closed 5 months ago

W0rtX commented 5 months ago

Describe the bug Hello, my friend Thank you for your hard work in creating this wonderful library. I have one small question The behavior I'm seeing is that when I run the celery task, Observer doesn't work as I expect. When changing a task through the Django admin panel, everything works fine, the model change is immediately sent to my websocket, but when I run the task in celery, the notifications disappear. I tried to find the reason for 2 days but without success I hope you can help me, my friend!

To Reproduce I won’t go into too much depth, I’ll describe the main points, they will be enough to understand the essence of the problem

docker compose ` ...etc... redis: image: redis:alpine container_name: redis_development ports:

...etc... `

Settings ` CELERY_BROKER_URL = 'redis://redis:6379/0' CELERY_RESULT_BACKEND = 'redis://redis:6379/0'

Channels

CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { 'hosts': ['redis://redis:6379/0'], }, }, } `

Consumer `class StudyConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer, PatchModelMixin): queryset = Study.objects.all() serializer_class = StudiesSerializer

@model_observer(Study)
async def model_change(self, message, **kwargs):
    await self.send_json(message)

@action()
def retrieve(self, **kwargs) -> Tuple[ReturnDict, int]:
    instance = self.get_object(**kwargs)
    serializer = self.get_serializer(instance=instance, action_kwargs=kwargs)
    return serializer.data, status.HTTP_200_OK

@action()
def patch(self, data: dict, **kwargs) -> Tuple[ReturnDict, int]:
    instance = self.get_object(data=data, **kwargs)
    serializer = self.get_serializer(instance=instance, data=data, action_kwargs=kwargs, partial=True)
    serializer.is_valid(raise_exception=True)
    self.perform_patch(serializer, **kwargs)
    if getattr(instance, "_prefetched_objects_cache", None):
        instance._prefetched_objects_cache = {}

    if data['status'] == Study.PROCESSING:
        run.delay(instance.pk) # Here i call the task

    return serializer.data, status.HTTP_200_OK`

Celery task: def run(pk: str): study_record = Study.objects.get(id=pk) study_record.completed_at = datetime.datetime.now() study_record.status = Study.COMPLETED study_record.save(update_fields=['processing_time', 'result', 'status', 'started_at', 'completed_at']) return 'A processing celery task was complete!'

Expected behavior I'm guessing that changes inside the celery task should also fire an observer trigger and be sent to Websocket

Thanks for any answer Good luck!

W0rtX commented 5 months ago
class TestAppConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'testapp'

    def ready(self):
        from OtherApp import consumers