Open sohamnavadiya opened 6 years ago
Here is my log:
System check identified no issues (0 silenced).
August 09, 2018 - 15:38:13
Django version 2.0.7, using settings 'leadgeneration.settings'
Starting ASGI/Channels version 2.1.2 development server at http://0.0.0.0:8082/
Quit the server with CONTROL-C.
2018-08-09 15:38:13,066 - INFO - server - HTTP/2 support not enabled (install the http2 and tls Twisted extras)
2018-08-09 15:38:13,066 - INFO - server - Configuring endpoint tcp:port=8082:interface=0.0.0.0
2018-08-09 15:38:13,067 - INFO - server - Listening on TCP address 0.0.0.0:8082
[2018/08/09 15:38:19] WebSocket HANDSHAKING /lead/ [127.0.0.1:45618]
Connected {'type': 'websocket.connect'}
[2018/08/09 15:38:19] WebSocket CONNECT /lead/ [127.0.0.1:45618]
[2018/08/09 15:38:21] HTTP GET /lead-call/XXXXXXXXXX/ 200 [0.03, 127.0.0.1:45650]
[2018/08/09 15:38:21] WebSocket DISCONNECT /lead/ [127.0.0.1:45618]
Disconnect {'type': 'websocket.disconnect', 'code': 1001}
2018-08-09 15:38:32,087 - WARNING - server - Application instance <Task pending coro=<__call__() running at /home/soham/PycharmProjects/lead-generation/venv/lib/python3.5/site-packages/channels/sessions.py:175> wait_for=<Future pending cb=[Task._wakeup()]>> for connection <WebSocketProtocol client=['127.0.0.1', 45618] path=b'/lead/'> took too long to shut down and was killed.
This means something in your consumer is not exiting after the disconnect (you don't need to send websocket.close
, so get rid of that) - it's not a problem in Channels from what I can tell.
If you can trace it down to a clear problem in channels with steps to reproduce from a fresh project, please re-open the bug - you can see more about where to ask questions at http://channels.readthedocs.io/en/latest/support.html
In my case, the problem was with redis
.
check if there are tcp connections in CLOSE_WAIT
state.
netstat | grep 6379 | fgrep -c CLOSE_WAIT
In my case I had around 50+ in such state. Had to update redis.
I'm gonna say this here as it may help others. In my case the problem was just a silly mistake.
In the code I was maintaining websocket_disconnect
was defined without call to super()
so StopConsumer()
was never raised. The right way would be to override disconnect
method which is wrapped inside websocket_disconnect
in AsyncWebsocketConsumer
. Just avoiding to override websocket_disconnect
solved my problem.
In the example above the same pattern exists. However I am not sure if that was the case back then in 2018 or this is a new style in Channels' code.
I have a similar situation, but not the same I have:
I am using Redis as backend with the code:
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [(CELERY_BROKER_URL)],
'capacity': 300
},
},
}
ASGI_THREADS = 1000
Using Daphne for Http and WebSocket
My asgi file:
import django
django.setup()
import chat.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')
from channels.http import AsgiHandler
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack(
URLRouter(
chat.routing.websocket_urlpatterns
)
),
})
Here is the error message:
Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7f301c57f738>()]>> for connection <WebRequest at 0x7f301c56a710 method=POST uri=/api/v1/analytics/divided/meteo/ clientproto=HTTP/1.0> took too long to shut down and was killed.
This happens when Fron-end sends requests to the Django. Sometimes it returns the response immediately, sometimes it takes a long time, frequently it throws the above error. I thought that it because of worker's concurrency, which is responsible for HTTP requests.
It is not the same case as mentioned above, because:
AsgiHandler.__call__()
on HTTP request. Websocket connection is not even established Can you explain why I have this issue? I thought the reason is django's version incompatibility
I have a similar situation, but not the same I have:
- django== 2.2.9
- channels == 2.4.0
- daphne == 2.5.0
I am using Redis as backend with the code:
CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": [(CELERY_BROKER_URL)], 'capacity': 300 }, }, } ASGI_THREADS = 1000
Using Daphne for Http and WebSocket
My asgi file:
import django django.setup() import chat.routing os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings') from channels.http import AsgiHandler from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter application = ProtocolTypeRouter({ 'websocket': AuthMiddlewareStack( URLRouter( chat.routing.websocket_urlpatterns ) ), })
Here is the error message:
Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7f301c57f738>()]>> for connection <WebRequest at 0x7f301c56a710 method=POST uri=/api/v1/analytics/divided/meteo/ clientproto=HTTP/1.0> took too long to shut down and was killed.
This happens when Fron-end sends requests to the Django. Sometimes it returns the response immediately, sometimes it takes a long time, frequently it throws the above error. I thought that it because of worker's concurrency, which is responsible for HTTP requests.It is not the same case as mentioned above, because:
- I am using generic "WebsocketConsumer" because querying data from the database via Django's ORM
- Error occurs because of
AsgiHandler.__call__()
on HTTP request. Websocket connection is not even establishedCan you explain why I have this issue? I thought the reason is django's version incompatibility
I'm facing the same issue in my project.
In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1
worker's
In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1
I could not solve the problem as I wanted. Instead, I run two containers:
1) The first container runs the Django without asgi (in wsgi mode) via the command ./manage.py runserver --noasgi
2) The second container run the Django for the only WebSocket via daphne via the command daphne -u /tmp/daphne.sock -b 0.0.0.0 -p 8088 app.asgi:application -v 2
So, I used Nginx as the reverse proxy to navigate all requests with "ws/" path to the second container, and the rest to the first container. Here is my conf.template code:
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream websocket {
server app_websocket:8088;
}
server {
...............
location / {
proxy_pass ${PROXY_PASS};
proxy_set_header Host ${DOMAIN_NAME};
#client_max_body_size 100M;
#proxy_buffering on;
#proxy_buffers 256 4k;
#proxy_busy_buffers_size 4k;
#proxy_read_timeout 60s;
#proxy_send_timeout 60s;
#proxy_max_temp_file_size 1024m;
#proxy_temp_file_write_size 4k;
}
location /ws/ {
proxy_pass http://websocket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Host ${DOMAIN_NAME};
}
............
}
In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1
This solved the issue for us. The app started to time out requests and emitting those warnings.
Pip freezing seems to be a must.
I have the same issue, I tried the solution from @MrVhek and unfortunately it didn't work. Any other hint ? Thanks
@andrewgodwin I can reproduce this by changing the chat example in document:
import json
import asyncio
import logging
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
def __init__(self, *args, **kwargs):
super(ChatConsumer, self).__init__(*args, **kwargs)
self.room_name = None
self.room_group_name = None
self.disconnected = True
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
self.disconnected = False # connected, set flag to False
async def disconnect(self, code):
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
self.disconnected = True # disconnected, set flag to True
async def receive(self, text_data=None, bytes_data=None):
text_json = json.loads(text_data)
message = text_json['message']
await self.channel_layer.group_send(self.room_group_name, {
'type': 'chat_message',
'message': message,
})
async def chat_message(self, event):
message = event['message']
# now simulate subscribe to redis, get the newest state, and send to user
# if disconnected, stop sending states
i = 0
while not self.disconnected:
await asyncio.sleep(1) # simulate sub to redis and wait new state
i += 1
logging.error('sending...')
await self.send(text_data=json.dumps({
'message': f'new state {i}'
}))
and then
It seems that the server push code (here the chat_message) preventing the call to disconnect(), so the self.disconnected flag have no chance to be set to True.
And here is my log output:
System check identified no issues (0 silenced).
January 31, 2021 - 17:48:26
Django version 3.1.5, using settings 'light.settings'
Starting ASGI/Channels version 3.0.3 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
WebSocket HANDSHAKING /ws/chat/cc/ [127.0.0.1:43922]
WebSocket CONNECT /ws/chat/cc/ [127.0.0.1:43922]
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
WebSocket DISCONNECT /ws/chat/cc/ [127.0.0.1:43922] <<<<<=== Client closed here. but still sending states to client
INFO:django.channels.server:WebSocket DISCONNECT /ws/chat/cc/ [127.0.0.1:43922]
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
WARNING:daphne.server:Application instance <Task pending name='Task-1' coro=<StaticFilesWrapper.__call__() running at /home/felix/PycharmProjects/light/venv/lib/python3.9/site-packages/channels/staticfiles.py:44> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f36dc9dea30>()]>> for connection <WebSocketProtocol client=['127.0.0.1', 43922] path=b'/ws/chat/cc/'> took too long to shut down and was killed.
@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate
@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate
@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.
@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate
@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.
If the coroutine (here chat_message) handle the message takes too long time to complete (to broadcast messages for example), it will block every message with the same type process until previous coroutine completes (maybe never).
As a potential solution, how about dispatching every handler coroutine (chat_message) in tasks (loop.create_task), and trace every task instance in a central place (for a consumer instance), and check the tasks at some point (when and how?) to clear completed ones, and when websocket disconnected, just cancel all tasks remaining active.
We can add a MAX_ACTIVE_TASKS to limit the max tasks can be created to avoid creating too many slow tasks.
If set MAX_ACTIVE_TASKS=1, we can use django-channels server as a broadcast service. If set MAX_ACTIVE_TASKS > 1 we can process more same type of messages concurrently.
It's comparable: Create more tasks to handle some type of messages in a consumer instance (for one connection with states). And Create more threads or processes to handle user requests in a HTTP web server (for all user requests without states).
@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate
@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.
If the coroutine (here chat_message) handle the message takes too long time to complete (to broadcast messages for example), it will block every message with the same type process until previous coroutine completes (maybe never).
As a potential solution, how about dispatching every handler coroutine (chat_message) in tasks (loop.create_task), and trace every task instance in a central place (for a consumer instance), and check the tasks at some point (when and how?) to clear completed ones, and when websocket disconnected, just cancel all tasks remaining active.
We can add a MAX_ACTIVE_TASKS to limit the max tasks can be created to avoid creating too many slow tasks.
If set MAX_ACTIVE_TASKS=1, we can use django-channels server as a broadcast service. If set MAX_ACTIVE_TASKS > 1 we can process more same type of messages concurrently.
It's comparable: Create more tasks to handle some type of messages in a consumer instance (for one connection with states). And Create more threads or processes to handle user requests in a HTTP web server (for all user requests without states).
I implemented a Consumer to dispatch message handler as tasks (only affect user defined tasks):
import asyncio
import copy
import collections
import functools
import json
from channels.consumer import get_handler_name
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
MAX_ACTIVE_TASKS = 2
def __init__(self, *args, **kwargs):
super(ChatConsumer, self).__init__(*args, **kwargs)
self.handler_tasks = collections.defaultdict(list)
self.joined_groups = set()
self.room_name = None
self.room_group_name = None
def complete_task(self, task_instance, handler_name):
print(f'Complete task for handler {handler_name}, task instance {task_instance}')
self.handler_tasks[handler_name].remove(task_instance)
print(
f'There are still {len(self.handler_tasks[handler_name])} active tasks for'
f' handler {handler_name}'
)
async def dispatch(self, message):
handler_name = get_handler_name(message)
handler = getattr(self, handler_name, None)
if handler:
if handler_name.startswith('chat_'):
# Create a task to process message
loop = asyncio.get_event_loop()
if len(self.handler_tasks[handler_name]) >= self.MAX_ACTIVE_TASKS:
await self.send(text_data=json.dumps({
'message': 'MAX_ACTIVE_TASKS reached'
}))
else:
handler_task = loop.create_task(handler(message))
# don't forget to remove the task from self.handler_tasks
# when task completed
handler_task.add_done_callback(
functools.partial(self.complete_task, handler_name=handler_name)
)
self.handler_tasks[handler_name].append(handler_task)
else:
# The old way to process message
await handler(message)
else:
raise ValueError("No handler for message type %s" % message["type"])
async def clear_handler_tasks(self):
for handler_name in self.handler_tasks:
task_instances = self.handler_tasks[handler_name]
for task_instance in task_instances:
task_instance.cancel()
# try:
# await task_instance
# except asyncio.CancelledError:
# print('Cancelled handler task', task_instance)
async def disconnect(self, code):
joined_groups = copy.copy(self.joined_groups)
for group_name in joined_groups:
await self.leave_group(group_name)
self.joined_groups.clear()
await self.clear_handler_tasks()
async def leave_group(self, group_name):
await self.channel_layer.group_discard(
group_name, self.channel_name
)
self.joined_groups.remove(group_name)
async def join_group(self, group_name):
await self.channel_layer.group_add(
group_name, self.channel_name
)
self.joined_groups.add(group_name)
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
await self.join_group(self.room_group_name)
await self.accept()
async def receive(self, text_data=None, bytes_data=None):
text_json = json.loads(text_data)
message = text_json['message'].strip()
if message.endswith('1'):
await self.channel_layer.group_send(self.room_group_name, {
'type': 'chat_message',
'message': message,
})
elif message.endswith('2'):
await self.channel_layer.group_send(self.room_group_name, {
'type': 'chat_message2',
'message': message,
})
else:
await self.send(text_data=json.dumps({
'message': 'invalid data'
}))
async def chat_message(self, event):
message = event['message']
while True:
print('sending...')
await self.send(text_data=json.dumps({
'message': message
}))
await asyncio.sleep(1)
async def chat_message2(self, event):
message = event['message']
await self.send(text_data=json.dumps({
'message': message
}))
In this implementation, you can send message such as "g1" to start the long running handler(chat_message), and still you can send "g2" to start the chat_message2 handler, if you send other data not endswith 1 or 2, you will receive a "invalid message" reply.
Client intput and output:
(venv) localhost:light Felix$ python manage.py chat_client
connected
g1
Received: {"message": "g1"}
gReceived: {"message": "g1"}
Received: {"message": "g1"}
2
Received: {"message": "g2"}
Received: {"message": "g1"}
g2
Received: {"message": "g2"}
Received: {"message": "g1"}
g2
Received: {"message": "g2"}
Received: {"message": "g1"}
g1
Received: {"message": "MAX_ACTIVE_TASKS reached"}
Received: {"message": "g1"}
gReceived: {"message": "g1"}
Received: {"message": "g1"}
g1
Received: {"message": "MAX_ACTIVE_TASKS reached"}
Received: {"message": "g1"}
gReceived: {"message": "g1"}
x
Received: {"message": "invalid data"}
Received: {"message": "g1"}
gxReceived: {"message": "g1"}
Received: {"message": "invalid data"}
Received: {"message": "g1"}
Received: {"message": "g1"}
^CDone
Server input and output:
Django version 3.1.5, using settings 'light.settings'
Starting ASGI/Channels version 3.0.3 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
WebSocket HANDSHAKING /ws/chat/cc/ [127.0.0.1:57667]
WebSocket CONNECT /ws/chat/cc/ [127.0.0.1:57667]
sending...
sending...
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-17' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-23' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-29' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
WebSocket DISCONNECT /ws/chat/cc/ [127.0.0.1:57667]
Complete task for handler chat_message, task instance <Task cancelled name='Task-11' coro=<ChatConsumer.chat_message() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:106>>
There are still 0 active tasks for handler chat_message
Cancelled handler task <Task cancelled name='Task-11' coro=<ChatConsumer.chat_message() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:106>>
It's a full duplex websocket consumer now, we can send data to websocket server at any time and get response, and websocket server can send data at any time too. The long running consumer handler will not block the consumer in the new implementation.
So, are you interested to implement it at the channels framework level ? @carltongibson @andrewgodwin
In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1
You Saved me from a lot of troubles, May I ask why asgiref is the issue and how you found it ?
In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1
Thanks!! This worked for me.
In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1
We're having the same problem using GraphQL so I'd remove djangorestframework from the picture π€
We're having the same problem using GraphQL so I'd remove djangorestframework from the picture π€
Experiencing the same with GraphQL π
Has this been solved, because i have the same issue and i haven't been able to fix it
in my case it was that the disk space was full and i just freed up some space and it was solved !
For anyone else with this error: my solution was to downgrade aioredis to 1.3.0. The newest version (2.0) is a complete rewrite and requires restructure of code, which I believe channels-redis has not taken into account when requiring aiosredis.
I have a Django project. I'm getting cpu spikes to 100% and many many of these logs at the same time,
2021-12-05 13:18:34,912 WARNING Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7fe6bf3625e8>()]>> for connection <WebRequest at 0x7fe6bf361898 method=GET uri=/**********
clientproto=HTTP/1.1> took too long to shut down and was killed.
2021-12-05 13:18:39,921 WARNING Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7fe6be55b6a8>()]>> for connection <WebRequest at 0x7fe6be55a898 method=GET uri=/**********
clientproto=HTTP/1.1> took too long to shut down and was killed.
2021-12-05 13:18:39,922 WARNING Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7fe6be300ac8>()]>> for connection <WebRequest at 0x7fe6be30fd68 method=GET uri=/**********
clientproto=HTTP/1.1> took too long to shut down and was killed.
requirements.txt
django==2.2 djangorestframework mysqlclient==1.3.13 django-push-notifications markdown python-dotenv twilio==6.10.0 gunicorn==19.8.1 django-modeladmin-reorder django-cors-headers==3.2.1 python-dateutil==2.8.1 channels==2.4.0 channels-redis==2.4.2 daphne==2.4.1 redis pycountry==19.8.18 country-list==1.0.0 django-admin-rangefilter==0.6.0 django-import-export==2.1.0 django-storages==1.9. Pillow==8.2.0 boto3==1.17.102 apns2==0.5.0
pip freeze
aioredis==1.3.1
apns2==0.5.0
asgiref==3.4.0
async-timeout==3.0.1
attrs==21.2.0
autobahn==21.2.1
Automat==20.2.0
boto3==1.17.102
botocore==1.20.102
certifi==2021.5.30
cffi==1.14.5
channels==2.4.0
channels-redis==2.4.2
chardet==4.0.0
constantly==15.1.0
country-list==1.0.0
cryptography==3.4.7
daphne==2.4.1
defusedxml==0.7.1
diff-match-patch==20200713
Django==2.2
django-admin-rangefilter==0.6.0
django-cors-headers==3.2.1
django-import-export==2.1.0
django-modeladmin-reorder==0.3.1
django-push-notifications==2.0.0
django-storages==1.9
djangorestframework==3.12.4
et-xmlfile==1.1.0
gunicorn==19.8.1
h2==2.6.2
hiredis==2.0.0
hpack==3.0.0
http-ece==1.1.0
hyper==0.7.0
hyperframe==3.2.0
hyperlink==21.0.0
idna==2.10
importlib-metadata==4.6.0
incremental==21.3.0
jmespath==0.10.0
Markdown==3.3.4
MarkupPy==1.14
msgpack==0.6.2
mysqlclient==1.3.13
odfpy==1.4.1
openpyxl==3.0.7
Pillow==8.2.0
py-vapid==1.8.2
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycountry==19.8.18
pycparser==2.20
PyJWT==2.1.0
pyOpenSSL==20.0.1
PySocks==1.7.1
python-dateutil==2.8.1
python-dotenv==0.18.0
pytz==2021.1
pywebpush==1.13.0
PyYAML==5.4.1
redis==3.5.3
requests==2.25.1
s3transfer==0.4.2
service-identity==21.1.0
six==1.16.0
sqlparse==0.4.1
tablib==3.0.0
twilio==6.10.0
Twisted==21.2.0
txaio==21.2.1
typing-extensions==3.10.0.0
urllib3==1.26.6
xlrd==2.0.1
xlwt==1.3.0
zipp==3.4.1
zope.interface==5.4.0
Please help me out over here, Thanks!
I have a similar situation, but not the same I have:
django== 2.2.9 channels == 2.4.0 daphne == 2.5.0 I am using Redis as backend with the code:
CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": [(CELERY_BROKER_URL)], 'capacity': 300 }, }, } ASGI_THREADS = 1000 Using Daphne for Http and WebSocket
My asgi file:
import django django.setup() import chat.routing os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings') from channels.http import AsgiHandler from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter application = ProtocolTypeRouter({ 'websocket': AuthMiddlewareStack( URLRouter( chat.routing.websocket_urlpatterns ) ), }) Here is the error message: Application instance <Task pending coro=<AsgiHandler.call() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.
._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7f301c57f738>()]>> for connection <WebRequest at 0x7f301c56a710 method=POST uri=/api/v1/analytics/divided/meteo/ clientproto=HTTP/1.0> took too long to shut down and was killed. This happens when Fron-end sends requests to the Django. Sometimes it returns the response immediately, sometimes it takes a long time, frequently it throws the above error. I thought that it because of worker's concurrency, which is responsible for HTTP requests. It is not the same case as mentioned above, because:
I am using generic "WebsocketConsumer" because querying data from the database via Django's ORM Error occurs because of AsgiHandler.call() on HTTP request. Websocket connection is not even established Can you explain why I have this issue? I thought the reason is django's version incompatibility
@ReadMost I am facing for some weeks the same problem. How did you solve it?
I managed to resolve this issue in local dev without needing to use both wsgi and asgi servers together (channels==3.0.4, asgiref==3.5.0, Django==4.0.3
). I tried a combination of:
const options = {
WebSocket: WebSocket,
connectionTimeout: 10000,
maxRetries: 3,
minUptime: 2000,
maxEnqueuedMessages: 1000000,
};
const newSocket = new ReconnectingWebSocket( 'ws://' + window.location.host + '/ws/somepath/' + roomName + '/' + '?' + queryString, [], options );
2. Setting thread_sensitive to False globally, and setting it to True manually for ORM operations. i.e. using @sync_to_async(thread_sensitive=True) as a decorator kwarg for sync functions involving writing to the database.
3. If you have a `connect()` method that involves long running tasks, simply call await self.accept() as early as possible within the method, right below `await self.channel_layer.group_add()`.
Initially, this worked fine for connection timeouts. However, in my particular case, my disconnect method requires a relatively lengthy ORM operation involving celery workers. Therefore, after switching to a remote database, the disconnect method became an issue once again.
Fortunately, there is a simple workaround.
Instead of using the development server β i.e. `python manage.py runserver` β run it in production mode with Daphne. That way, you can specify a timeout interval via the `--application-close-timeout` flag.
Simply run:
```zsh
daphne -t 60 --application-close-timeout 60 your_project.asgi:application
The -t
flag is for --http-timeout
. It may not be necessary.
Of course, with this approach, you will need to serve any local static files through `STATIC_ROOT'. Whitenoise can be used to serve static files. It will work with an ASGI server (within Django anyway. Outside of Django I don't think it supports ASGI). If you don't want file caching, replace their suggested STATICFILES_STORAGE with:
STATICFILES_STORAGE = 'whitenoise.storage.CompressedStaticFilesStorage'
Hope this helps someone. Thanks to everyone for such an awesome projectπ π―
We're having the same problem using GraphQL so I'd remove djangorestframework from the picture π€
Experiencing the same with GraphQL π
@benedictkioko I have the same problem using graphene_django + channels, i solved it use the command "daphne -b 0.0.0.0 -p 8000 --application-close-timeout 60 --proxy-headers core.asgi:application", hope this can help someone.
daphne help doc: --application-close-timeout APPLICATION_CLOSE_TIMEOUT The number of seconds an ASGI application has to exit after client disconnect before it is killed
In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1
I changed from asgiref==3.2.7 to asgiref==3.2.10 worked for me on django 2.2
I am not sure if my issue is the same, but it results in a similar error. In my case, I am running my Django Channels ASGI app using Daphne, and I have a couple API calls that call an external API. I found that when that external API errored out and I did not have a timeout set on the requests call, it would hang the Daphne server. While I did have a flaw in my code, I do not think that such a flaw should render the entire server unresponsive and not able to be restarted through systemctl. Note that when I switched to uvicorn, I no longer had an issue even with the misbehaving code. In my real application, I have of course fixed my external API calls to have an explicit timeout. Here is a simplified repro case based on my real application: https://github.com/jessamynsmith/daphne_hang/
Here is my exact error:
2023-02-14 05:01:42,735 WARNING Application instance <Task pending name='Task-6' coro=<ProtocolTypeRouter.call() running at /Users/jessamynsmith/Development/daphne_hang/venv/lib/python3.11/site-packages/channels/routing.py:62> wait_for=<Future pending cb=[_chain_future.
@jessamynsmith Thank you so much for posting this!! I thought I was losing my mind π I'm in a similar situation -- my API calls are taking too long, so if a user navigates away from the page where the websocket was instantiated, there's no chance for it to cleanly exit, leading to the "took too long to shut down and was killed" error message.
@jessamynsmith Thank you so much for posting this!! I thought I was losing my mind π I'm in a similar situation -- my API calls are taking too long, so if a user navigates away from the page where the websocket was instantiated, there's no chance for it to cleanly exit, leading to the "took too long to shut down and was killed" error message.
Looking back on this thread, in the case of blocking I/O or CPU intensive operations during a disconnect method, it might be worth offloading those tasks to a worker, with something like celery or RQ.
Looking back on this thread, in the case of blocking I/O or CPU intensive operations during a disconnect method, it might be worth offloading those tasks to a worker, with something like celery or RQ.
That was my thought too. I was considering using Celery, but Celery isn't meant to be a real-time queue, and more importantly, if you offload the task to Celery you would have a hard(er) time pushing the result back to the channels consumer.
I guess you could submit the task and then continually poll it, waiting for it to be complete, but I didn't like that approach.
Basically, what I did was create a Queue
and Thread
:
class EditorConsumer(WebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
...
self.requests_queue = Queue()
self.requests_thread = None
self.stop_event = Event()
def connect(self):
self.requests_thread = Thread(
target=self.process_requests,
daemon=True
)
self.requests_thread.start()
...
self.accept()
def disconnect(self, code):
self.stop_event.set()
def receive(self, text_data=None, bytes_data=None):
self.requests_queue.put(text_data)
def process_requests(self):
while not self.stop_event.is_set():
text_data = self.requests_queue.get()
...
# NOTE: It's worth checking if the `stop_event` is set here. If you try to call
# `self.send` afterwards, you're likely going to error out
if not self.stop_event.is_set():
# send the response back to the client
self.send(text_data=json.dumps(output_data))
Using this setup, disconnect
is allowed to run and raise the StopConsumer
event. The process_requests
daemon is left to finish up whatever item it is currently on in the queue, then it exits, regardless if the queue is empty or not.
It's probably not the most "elegant" way to solve the problem, but it keeps the solution entirely inside the consumers.py
file without having to involve Celery, Redis, etc., which has additional overhead (and again, Celery isn't meant to be real-time, which IMO, doesn't make it the right choice when working with websockets).
Edit: If my logic/thought process about not using Celery here is incorrect I would love to know.
Update I refactored the process_requests
method to ensure the daemon thread properly shuts down.
def process_requests(self):
# loop indefinitely
while True:
# attempt to grab the next task from the queue
try:
# grab the next task
text_data = self.requests_queue.get(
timeout=self.QUEUE_TIMEOUT)
...
# only send the response if the connection is open
if not self.disconnected.is_set():
# send the response back to the client
self.send(text_data=json.dumps(output_data))
# otherwise, the connection is closed, so break from the loop
else:
break
# the queue is empty, which can naturally happen if there are no
# requests for us to process
except queue.Empty:
# check if the connection has been closed, and if so, break
# from the loop, so we can exit the thread
if self.disconnected.is_set():
break
print("SHUTTING DOWN THREAD")
@jrosebr1 Nice! It depends on the use case I guess, but if you need to send a result to the client from outside the consumer class, you could send it via a channel layer. You could also use something like python-socketio instead of channels to emit a message from an external process.
Thanks @Saran33, I'll take a look at those.
@jessamynsmith saved my life
first message is succeed but further messages throw this exception:
My consumer class
requirements.txt
How to solve my issue?
Thanks in advance.