django / channels

Developer-friendly asynchrony for Django
https://channels.readthedocs.io
BSD 3-Clause "New" or "Revised" License
6.09k stars 799 forks source link

Can't call send on channel layer from celery task #1799

Closed mssalnikov closed 2 years ago

mssalnikov commented 2 years ago

In continuation of this closed issue: https://github.com/django/channels/issues/1017

Trying to call async_to_sync(chanel_layer.send) from celery task causes an error:

You cannot use AsyncToSync in the same thread as an async event loop

channels 3.0.4 and daphne 3.02, so the issue wasn't fixed, as opposed to what's said in that thread

carltongibson commented 2 years ago

@mssalnikov You’re going to need to dig in a bit deeper and see what’s happening here. Ideally a minimal reproduce is needed. It’s impossible to say anything without more detail.

I’ll close this, but happy to reopen if you can show what’s going on properly.

mssalnikov commented 2 years ago

I have the exact same case as in the comment https://github.com/django/channels/issues/1017#issuecomment-523014937 , but with the latest versions of the libraries

udbhav commented 2 years ago

I'm also experiencing this issue with latest versions of relevant libraries.

I think this might be something on celery's end, I've opened a ticket there: https://github.com/celery/celery/issues/7465 I've also setup a minimal reproducible test case: https://github.com/udbhav/celery-channels-test

Hope that helps, and thank you.

udbhav commented 2 years ago

After investigating more, I think it might actually be something happening with channels in conjunction with gevent, I added a new test to the repo with a celery task that doesn't call channels, and instead detects whether an event loop is running, and no event loop is created. However, as soon as I add a async_to_sync(channel_layer.group_send) call, running multiple tasks will trigger the runtime error. My fairly uneducated guess is that asgiref creates temporary event loops when calling async_to_sync, and these event loops occasionally aren't fully closed when gevent is managing multiple greenlets.

EDIT: Experimenting more with my test case repo, there are no problems in python 3.6, issue only starts happening if I use python 3.7. Issue also does't happen with prefork celery pool, but does with gevent. For my case, I'm going to create separate tasks that call channels' group_send that will run in their own prefork worker pool.

sbatchelder commented 2 years ago

I had trouble with this too and solved it similarly. For anyone coming after....

I created a new non-greenlet django celery worker with a dedicated websocket queue "wsQ":

celery --app ${CELERY_APP} --broker "${CELERY_BROKER_URL}" worker --hostname=ws-worker@%h -l INFO --pool=solo -Q wsQ

I created a task that handles channel send and groupsend:

@shared_task(bind=True, name='queue_ws_event', ignore_result=True, queue='wsQ')
def queue_ws_event(self, ws_channel, ws_event:dict, group=True):
    channel_layer = get_channel_layer()
    if group:
        async_to_sync(channel_layer.group_send)(ws_channel,ws_event)
    else:
        async_to_sync(channel_layer.send)(ws_channel,ws_event)

and now I can replace my troublesome async_to_sync(self.channel_layer.group_send)(ws_group,ws_event) websocket broadcast calls in my main greenlet task to queue_ws_event.delay(ws_group, ws_event)

WYmindsky commented 1 year ago

After investigating more, I think it might actually be something happening with channels in conjunction with gevent, I added a new test to the repo with a celery task that doesn't call channels, and instead detects whether an event loop is running, and no event loop is created. However, as soon as I add a async_to_sync(channel_layer.group_send) call, running multiple tasks will trigger the runtime error. My fairly uneducated guess is that asgiref creates temporary event loops when calling async_to_sync, and these event loops occasionally aren't fully closed when gevent is managing multiple greenlets.

EDIT: Experimenting more with my test case repo, there are no problems in python 3.6, issue only starts happening if I use python 3.7. Issue also does't happen with prefork celery pool, but does with gevent. For my case, I'm going to create separate tasks that call channels' group_send that will run in their own prefork worker pool.

tks, python3.6 is ok, python3.7 cause the problem.

pam-param-pam commented 9 months ago

Still having the same issue with channels 4.0.0. Somehow @sbatchelder's solution works - i wonder how it works?

sbatchelder commented 9 months ago

The solution works by not doing the actual websocket-send call in the greenlet thread. The send command is instead deferred to some other non-greenlet queue for execution. WHY it doesn't work in a greenlet process, that I can't say. Glad my workaround worked for you!

francotrax commented 9 months ago

I have exactly the same problem. I'm running Celery with Gevent, and I'm trying to stream data from the background worker to websocket. As soon as the code runs async_to_sync(channel_layer.group_send) I get errors.