django / channels

Developer-friendly asynchrony for Django
https://channels.readthedocs.io
BSD 3-Clause "New" or "Revised" License
6.11k stars 800 forks source link

Cannot access scope["user"] object from an async consumer #1017

Closed theof closed 6 years ago

theof commented 6 years ago

Traceback:

2018-04-13 17:44:51,382 - ERROR - server - Exception inside application: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
  File "/tmp/channels-examples/multichat/channels/consumer.py", line 54, in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
  File "/tmp/channels-examples/multichat/channels/utils.py", line 50, in await_many_dispatch
    await dispatch(result)
  File "/tmp/channels-examples/multichat/channels/consumer.py", line 67, in dispatch
    await handler(message)
  File "/tmp/channels-examples/multichat/channels/generic/websocket.py", line 173, in websocket_connect
    await self.connect()
  File "/tmp/channels-examples/multichat/chat/consumers.py", line 26, in connect
    if self.scope["user"].is_anonymous:
  File "/tmp/channels-examples/multichat/env/lib/python3.6/site-packages/django/utils/functional.py", line 215, in inner
    self._setup()
  File "/tmp/channels-examples/multichat/env/lib/python3.6/site-packages/django/utils/functional.py", line 349, in _setup
    self._wrapped = self._setupfunc()
  File "/tmp/channels-examples/multichat/channels/auth.py", line 142, in <lambda>
    scope["user"] = SimpleLazyObject(lambda: async_to_sync(get_user)(scope))
  File "/tmp/channels-examples/multichat/env/lib/python3.6/site-packages/asgiref/sync.py", line 34, in __call__
    "You cannot use AsyncToSync in the same thread as an async event loop - "
  You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

Using AuthMiddleware: accessing the scope["user"] object from an asynchronous consumer triggers this error.

This happens since #923 was merged (published in Channels v2.1.0). The traceback can be reproduced with the following steps:

andrewgodwin commented 6 years ago

Just confirming that this is an error caused by the release of Channels 2.1, but it's also correct to complain. I am investigating possible fixes now.

In the meantime, if this is happening in your own code, you can replace scope["user"] with await channels.auth.get_user(scope)

andrewgodwin commented 6 years ago

OK, this should be fixed on master now. I would appreciate if someone else can verify that it is fixed, and then I'll issue a bugfix release.

Note that you should make sure to install daphne>=2.1.1 so that you get the change there that runs application constructors in a threadpool (as I've had to move the user fetch to be synchronous at application construction time)

theof commented 6 years ago

It seems to work fine for the channels-example. Well done !

andrewgodwin commented 6 years ago

OK this is released in channels 2.1.1.

mkarimim commented 6 years ago

I have same issue with new release. @andrewgodwin

2018-05-25 12:46:37,516 ERROR    Traceback (most recent call last):
  File "/home/project/venv/lib/python3.5/site-packages/daphne/ws_protocol.py", line 76, in onConnect
    "subprotocols": subprotocols,
  File "/home/project/venv/lib/python3.5/site-packages/daphne/server.py", line 184, in create_application
    application_instance = self.application(scope=scope)
  File "/home/project/venv/lib/python3.5/site-packages/channels/routing.py", line 56, in __call__
    return self.application_mapping[scope["type"]](scope)
  File "/home/project/venv/lib/python3.5/site-packages/channels/sessions.py", line 40, in __call__
    return self.inner(dict(scope, cookies=cookies))
  File "/home/project/venv/lib/python3.5/site-packages/channels/sessions.py", line 138, in __call__
    return SessionMiddlewareInstance(scope, self)
  File "/home/project/venv/lib/python3.5/site-packages/channels/sessions.py", line 161, in __init__
    self.inner = self.middleware.inner(self.scope)
  File "/home/project/venv/lib/python3.5/site-packages/channels/auth.py", line 143, in __call__
    scope["user"] = async_to_sync(get_user)(scope)
  File "/home/project/venv/lib/python3.5/site-packages/asgiref/sync.py", line 34, in __call__
    "You cannot use AsyncToSync in the same thread as an async event loop - "
RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

my packages:

Django==1.11.11
channels==2.1.1
...

Thanks.

andrewgodwin commented 6 years ago

@mkarimim Did you also upgrade Daphne? You'll need a new version of that to do the threading for middleware.

mkarimim commented 6 years ago

Thank you for your reply, It's worked. Our daphne installed from channels dependencies (not specified in our dependencies). It's good to change the minimum version of daphne in channels to work properly.

Thanks.

andrewgodwin commented 6 years ago

Yes, I'll bump the Daphne requirement in Channels now.

RajaParikshit commented 5 years ago

I have something weird issue going on.

My packages

asgiref==3.2.1 celery==4.2.0 channels==2.2.0 channels-redis==2.4.0 daphne==2.3.0 Django==2.2.4 redis==3.3.7

This is my consumer class -

 class JobsConsumer(AsyncJsonWebsocketConsumer):

    async def connect(self):
        await self.accept()

    async def disconnect(self, code):
        pass

    async def receive_json(self, content, **kwargs):
        print(content)
        if content:
            if content['action'] == "start_sec3":
                await self.start_sec3(job_name=content['job_name'])

    async def start_sec3(self, job_name):
        print("Start_sec3 called")
        log.debug("job Name=%s", job_name)
        # Save model to our database
        job = Job(
            name=job_name,
            status="started",
        )
        database_sync_to_async(job.save())

        print("Job saved")

        # Start long running task here (using Celery)
        print("Celery called")
        sec3_task = sec3.delay(job.id, self.channel_name)

        # Store the celery task id into the database if we wanted to
        # do things like cancel the task in the future
        job.celery_id = sec3_task.id
        database_sync_to_async(job.save())

        print("Celery id saved")

        # Tell client task has been started

        print("Telling to client")

        await self.send_json({
                "action": "started",
                "job_id": job.id,
                "job_name": job.name,
                "job_status": job.status,
            })

    async def celery_job(self, data):
        await self.send_json(data)`

My Celery Task -

@shared_task
def sec3(job_id, reply_channel):
    # time sleep represent some long running process
    time.sleep(3)
    # Change task status to completed
    job = Job.objects.get(pk=job_id)
    log.debug("Running job_name=%s", job.name)
    print("Running job_name=%s", job.name)

    job.status = "completed"
    job.completed = datetime.now()
    job.save()

    # Send status update back to browser client
    if reply_channel is not None:
        print('Sending completed status.')
        print(reply_channel)
        channel_layer = channels.layers.get_channel_layer()

       # Problem occurs here (Sometimes)
        async_to_sync(channel_layer.send)(reply_channel, {
                "type": 'celery_job',
                "action": "completed",
                "job_id": job.id,
                "job_name": job.name,
                "job_status": job.status,
            })

When task are added with some delay of 1-2 seconds, everything works perfectly i.e job status is notified correctly by async_to_sync. But when task are added rapidly async_to_sync failed to notify by giving error

"You cannot use AsyncToSync in the same thread as an async event loop - "

According to this answer on SO - https://stackoverflow.com/a/53544010/8137534 async_to_sync should check outer_loop.

I'm not getting this outer_loop thing and why this happens?

shereengh commented 4 years ago

I have the same issue as @RajaParikshit and with the same versions of channels and daphne, can this issue be reopened and addressed? @andrewgodwin

pavitrakumar78 commented 3 years ago

@shereengh and @RajaParikshit Did you ever figure out a solution for this?

mssalnikov commented 2 years ago

I have this same issue when calling async_to_sync(chanel_layer.send) from celery task with channels 3.0.4 and daphne 3.02

danryu commented 2 years ago

Same issue here, also calling async_to_sync(channel_layer.send) from celery task. channels 2.4.0 daphne 2.5.0

mssalnikov commented 2 years ago

Can this issue be reopened? Or should we create a separate one?

mssalnikov commented 2 years ago

I've created a separate issue for this

https://github.com/django/channels/issues/1799