Open drpancake opened 3 years ago
I found this old issue in Celery that is perhaps helpful: https://github.com/celery/django-celery/issues/121
Also this blog post: https://tryolabs.com/blog/2014/02/12/long-running-process-and-django-orm/
And one more lead: https://stackoverflow.com/a/37891165/3211027
Is the DbConnectionsMiddleware
correctly configured in your app? Make sure it has a higher priority than the admin middleware (i.e. comes before it in the middleware list).
Is the
DbConnectionsMiddleware
correctly configured in your app? Make sure it has a higher priority than the admin middleware (i.e. comes before it in the middleware list).
@Bogdanp yes it's in the correct order, my MIDDLEWARE
is set up as follows:
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Retries",
"django_dramatiq.middleware.DbConnectionsMiddleware",
"django_dramatiq.middleware.AdminMiddleware",
"ss.web.utils.dramatiq.SentryMiddleware",
I'm also now seeing this error around 50 times per hour: 'NoneType' object has no attribute 'decode'
. It's raised from the decode()
method in dramatiq.encoder.JSONEncoder
.
I currently have CONN_MAX_AGE=0
set in Django. Would you recommend changing this setting?
No, that setting should be fine (if a little inefficient). Re. the decoding error, are you using the Redis broker? If so, you should try upgrading to dramatiq
v1.11.0 which has some related fixes and improvements.
@Bogdanp Yep using Redis as the broker. I'll try that, thanks.
Any clues about the connection already closed
error? I'm happy to dig into this and try to fix it myself, but I'm not exactly sure where to begin! I looked at the source and I can see that this library does handle old/broken DB connections, so I wonder if this is some obscure race condition that's only happening because I'm running a lot of tasks.
I'm really not sure what it could be. I would probably start by making sure the clean up code in the connections middleware runs at the right time and there's nothing else that might be opening up connections after it and leaving them open.
I don't actually use django_dramatiq but I just run into the same problem with plain Django + Dramatiq. Are you wrapping any DB calls in sync_to_async?
Here's a standalone Django management command to recreate it:
import asyncio
from time import sleep
from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand
class Command(BaseCommand):
def handle(self, *args, **options):
async def test():
print('Getting')
return await sync_to_async(AModel.objects.get)(id=1234)
# Each loop of the while block is akin to a Dramatiq task under a long
# running worker
while True:
task = asyncio.ensure_future(test())
asyncio.get_event_loop().run_until_complete(asyncio.wait([task]))
try:
print(task.result())
except Exception as e:
print(e)
sleep(5)
While the above is running kill the connection in Postgres with the following:
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid != pg_backend_pid() AND query != '';
Then it should start spitting out "connection already closed" errors.
It seems like extra DB cleanup is needed for any async threads or it could be a Django bug?
A fix seems to be to send db.close_old_connections()
through sync_to_async()
so that it finds and cleans up any connections in there.
import asyncio
from django import db
from time import sleep
from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand
class Command(BaseCommand):
def handle(self, *args, **options):
async def test():
return await sync_to_async(AModel.objects.get)(id=1234)
# Each loop of the while block is akin to a Dramatiq task under a long
# running worker
while True:
try:
print('SYNC', AModel.objects.get(id=1234))
except Exception as e:
print(e)
task = asyncio.ensure_future(test())
asyncio.get_event_loop().run_until_complete(asyncio.wait([task]))
try:
print('ASYNC', task.result())
except Exception as e:
print(e)
# Remove dead sync connections
db.close_old_connections()
# Remove dead async connections
asyncio.get_event_loop().run_until_complete(
sync_to_async(db.close_old_connections)()
)
sleep(5)
Thanks for this.
I don't actually use django_dramatiq but I just run into the same problem with plain Django + Dramatiq. Are you wrapping any DB calls in sync_to_async?
No, I'm not using any Django async functionality in my project. I tried putting db.close_old_connections()
in various places but it doesn't seem to help.
My current (horrible) solution is to restart my worker process every 15 minutes!
It could also be a regular thread somewhere (or running dramatiq with --threads x and non thread safe code). From what I understand sync_to_async is just a wrapper around an asyncio ThreadExecutor.
I'm having the same issue. The funny part is it's only on our staging system. Production seems fine. This was a lift and shift extraction of a microservice. I've been working on getting it into production. No async code.
We're using
django_dramatiq
dramatiq_sqs
sentry-dramatiq
django_periodiq
Settings
DATABASES = {"default": dj_database_url.parse(configuration.db.url, conn_max_age=500)}
.
.
.
DRAMATIQ_BROKER = {
"BROKER": "dramatiq_sqs.SQSBroker",
"OPTIONS": {
"namespace": "pricing-service_tasks",
},
"MIDDLEWARE": [
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Retries",
"django_dramatiq.middleware.DbConnectionsMiddleware",
"periodiq.PeriodiqMiddleware",
],
}
Only task running
@dramatiq.actor(periodic=cron("*/1 * * * *"))
def example_task():
"""For testing Dramatiq setup in staging and prod. Will delete/revert once I'm done."""
sleep(5)
user, _ = User.objects.get_or_create(email="test@dramatiq.test", is_active=False)
user.first_name = random.choice(range(1, 999999))
user.save()
@martyphee Is it easily reproducible or somewhat random? Do you have a high throughput of tasks?
I'm also using Sentry and I wonder if that could be related.
For extra context I'm averaging 20 tasks/second and it this issue usually appears after 24-48 hours. But it appears to be entirely random.
@drpancake seems to consistently happen on our staging servers. The picture below are the task processors and each is now holding 64 connections. That will be the max they hold and we get into a situation where the connection pool, for the lack of a better description, freaks out and throws tons of errors saying connection is already closed
. I think it might be hitting the max db connections.
This is the only job running and it only runs once per minute as a test. There is nothing else hitting it.
It took about 24 hours to hit the 64/conn per pod. I restarted the pods yesterday morning.
Sentry error
Interesting! Let me know if you make any breakthroughs. Perhaps you could try running the task more frequently to see if you hit the error sooner.
As much as I detest it, I may have to move over to Celery at some point as it's at least battle-tested for a high throughput of tasks like my use case.
As much as I detest it, I may have to move over to Celery at some point as it's at least battle-tested for a high throughput of tasks like my use case.
FWIW, I'm having this problem with Celery and there's an issue about this here: https://github.com/celery/django-celery/issues/121
Bummer to see dramatiq having this problem too. I was just thinking I could solve so many of precisely these kinds of issues by switching over.
@mlissner That's disheartening to hear! I'm still using Dramatiq but I have a cron job that restarts the worker process every 15 minutes. Luckily for my use case tasks getting killed randomly is acceptable.
Sigh...I've been seeing this as well 3 years later. I have 48 workers each with a thread servicing 100 or so requests. The task is very complex and brittle (in Production I can't let it fail) so looking for a simple way of creating a test task to simulate the issue. The problem never happens with 16 workers and I have pgbouncer in the mix too, so I am not even sure where to start.
I'm pushing 500+ tasks per minute through Dramatiq as part of a Django app and occasionally (once every 1-2 days) I suddenly get hundreds instances of this error from the workers and the only way to fix it is to restart the worker process.
Sentry reports that it's triggered in both
before_process_message()
andafter_process_message()
when they both callTask.tasks.create_or_update_from_message()
.I have
CONN_MAX_AGE
set to0
and database connections are pooled via PgBouncer.Please let me know if I'm missing any other information.