celery / celery

Distributed Task Queue (development branch)
https://docs.celeryq.dev
Other
24.89k stars 4.67k forks source link

Django eventlet patch is broken #5924

Open WisdomPill opened 4 years ago

WisdomPill commented 4 years ago

Hello!

I have a lot of http requests to make in my django project that uses celery for task processing.

I tried using eventlet and gevent as a pool executor and I immediately ran into problems when using eventlet. Gevent seems to work, I will give it more data to work on in the near future, to see how it behaves.

I've read in the source code that they both should patch themselves on the celery init.

The error that I got with eventlet is:

DatabaseWrapper objects created in a thread can only be used in that same thread. The object with alias 'default' was created in thread id x and this is thread id y.

these are the most important requirements of the project

aioredis==1.3.1
amqp==2.5.2
asgiref==3.2.3
Babel==2.8.0
billiard==3.6.1.0
celery==4.4.0
channels==2.4.0
channels-redis==2.4.1
daphne==2.4.1
decorator==4.4.1
Django==3.0.2
django-redis==4.11.0
dnspython==1.16.0
eventlet==0.25.1
flower==0.9.3
gevent==1.4.0
greenlet==0.4.15
hiredis==1.0.1
kombu==4.6.7
psycopg2-binary==2.8.4
redis==3.3.11
vine==1.3.0

These are my celery settings

CELERY_BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_RESULT_EXPIRES = timedelta(hours=6)
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

I've read on stack overflow and such that it is mandatory to monkey_patch eventlet before anything else, but that seem like it's taken care of here https://github.com/celery/celery/blob/d0563058f8f47f347ac1b56c44f833f569764482/celery/**init**.py#L103

If you need further information just let me know.

I'm open on helping in coding and testing eventlet pool worker.

auvipy commented 4 years ago

we are open to improvement.

webjunkie commented 4 years ago

I can confirm that this seems to be connected to Django 3. It works for me with eventlet and Django 2 still.

azaitsev commented 4 years ago

The same issue.

django==3.0.3 eventlet==0.25.1

zmej-serow commented 4 years ago

Same issue on Docker running under Windows10 (images on AlpineLinux).

I've tried to monkey_patch() Django (in manage.py), but it doesn't have any effect. Celery tasks, by the way, are working and finishing perfectly (though I don't work with DB inside tasks yet). Just getting this annoying message in logs for every on_task_postrun() and on_task_prerun():

Signal handler <bound method DjangoWorkerFixup.on_task_postrun of <celery.fixups.django.DjangoWorkerFixup object at 0x7efce0c32650>> raised: DatabaseError("DatabaseWrapper objects created in a thread can only be used in that same thread. The object with alias 'default' was created in thread id 139624590013200 and this is thread id 139624549594400.")

Django==3.0 eventlet==0.25.1 greenlet==0.4.15 celery==4.4.0

Achilles718611 commented 4 years ago

I just updated Django version to 3.x, and when I try to run celery I got the following error. django.db.utils.DatabaseError: DatabaseWrapper objects created in a thread can only be used in that same thread. The object with alias 'default' was created in thread id 2308608235216 and this is thread id 2308821021960.

Here is detail information. celery -A proj worker --pool eventlet --concurrency=20 -l info

Django==3.0.6 celery==4.4.2 eventlet==0.25.2 greenlet==0.4.15 gunicorn==20.0.4

I think it's not gunicorn problem. It seems related with Django 3 and Celery

sakyoud-zakaria commented 4 years ago

this is what worked for me i updated celery and eventlet to the last version and then i added CELERY_TASK_ALWAYS_EAGER to celery configuration

# celery
CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
CELERY_TASK_ALWAYS_EAGER = True
auvipy commented 4 years ago

i updated celery and eventlet to the last version and then i added CELERY_TASK_ALWAYS_EAGER to celery configuration

# celery
CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
CELERY_TASK_ALWAYS_EAGER = True

what did you get? trackback?

sakyoud-zakaria commented 4 years ago

i updated celery and eventlet to the last version and then i added CELERY_TASK_ALWAYS_EAGER to celery configuration

# celery
CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
CELERY_TASK_ALWAYS_EAGER = True

what did you get? trackback?

i forgot the tell that it was what worked for sorry

xrmx commented 4 years ago

@auvipy here's a stacktrace if you need one:

DatabaseError: DatabaseWrapper objects created in a thread can only be used in that same thread. The object with alias 'default' was created in thread id 140201002996392 and this is thread id 140200793612968.
  File "celery/utils/dispatch/signal.py", line 288, in send
    response = receiver(signal=self, sender=sender, **named)
  File "celery/fixups/django.py", line 172, in on_task_postrun
    self.close_database()
  File "celery/fixups/django.py", line 177, in close_database
    return self._close_database()
  File "celery/fixups/django.py", line 186, in _close_database
    conn.close()
  File "django/utils/asyncio.py", line 26, in inner
    return func(*args, **kwargs)
  File "django/db/backends/base/base.py", line 286, in close
    self.validate_thread_sharing()
  File "django/db/backends/base/base.py", line 558, in validate_thread_sharing
    % (self.alias, self._thread_ident, _thread.get_ident())
auvipy commented 4 years ago

seems django's asyncio and celery eventlet is conflicting?

xrmx commented 4 years ago

Yeah, I am puzzled on why asyncio should be involved at all though

SuperMasterBlasterLaser commented 4 years ago

Hello everyone, this problem still persists

Django==3.0.7 Celery==4.4.6 eventlet==0.25.2

When I call task like this

my_task.apply_async((instance.id,), connection=Connection(settings.CELERY_BROKER_URL))

It throws this:

File "/envs/lib/python3.6/site-packages/eventlet/greenpool.py", line 88, in _spawn_n_implfunc(*args, **kwargs)
File "/envs/lib/python3.6/site-packages/celery/concurrency/eventlet.py", line 35, in apply_targetpid=getpid())
File "/envs/lib/python3.6/site-packages/celery/concurrency/base.py", line 32, in apply_targetret = target(*args, **kwargs)
File "/envs/lib/python3.6/site-packages/celery/app/trace.py", line 612, in _fast_trace_taskuuid, args, kwargs, request,
File "/envs/lib/python3.6/site-packages/celery/app/trace.py", line 531, in trace_taskI, _, _, _ = on_error(task_request, exc, uuid)
File "/envs/lib/python3.6/site-packages/celery/app/trace.py", line 366, in on_errortask, request, eager=eager, call_errbacks=call_errbacks,
File "/envs/lib/python3.6/site-packages/celery/app/trace.py", line 173, in handle_error_statecall_errbacks=call_errbacks)
File "/envs/lib/python3.6/site-packages/celery/app/trace.py", line 218, in handle_failurecall_errbacks=call_errbacks,
File "/envs/lib/python3.6/site-packages/celery/backends/base.py", line 169, in mark_as_failuretraceback=traceback, request=request)
File "/envs/lib/python3.6/site-packages/celery/backends/base.py", line 443, in store_resultrequest=request, **kwargs)
File "/envs/lib/python3.6/site-packages/django_celery_results/backends/database.py", line 40, in _store_resultusing=using,
File "/envs/lib/python3.6/site-packages/django_celery_results/managers.py", line 52, in _innerreturn fun(*args, **kwargs)
File "/envs/lib/python3.6/site-packages/django_celery_results/managers.py", line 138, in store_resultdefaults=fields)
File "/envs/lib/python3.6/site-packages/django/db/models/query.py", line 559, in get_or_createreturn self.get(**kwargs), False
File "/envs/lib/python3.6/site-packages/django/db/models/query.py", line 411, in getnum = len(clone)
File "/envs/lib/python3.6/site-packages/django/db/models/query.py", line 258, in __len__self._fetch_all()
File "/envs/lib/python3.6/site-packages/django/db/models/query.py", line 1261, in _fetch_allself._result_cache = list(self._iterable_class(self))
File "/envs/lib/python3.6/site-packages/django/db/models/query.py", line 57, in __iter__results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
File "/envs/lib/python3.6/site-packages/django/db/models/sql/compiler.py", line 1150, in execute_sqlcursor = self.connection.cursor()
File "/envs/lib/python3.6/site-packages/django/utils/asyncio.py", line 26, in innerreturn func(*args, **kwargs)
File "/envs/lib/python3.6/site-packages/django/db/backends/base/base.py", line 260, in cursorreturn self._cursor()
File "/envs/lib/python3.6/site-packages/django/db/backends/base/base.py", line 238, in _cursorreturn self._prepare_cursor(self.create_cursor(name))
File "/envs/lib/python3.6/site-packages/django/db/backends/base/base.py", line 228, in _prepare_cursorself.validate_thread_sharing()
File "/envs/lib/python3.6/site-packages/django/db/backends/base/base.py", line 558, in validate_thread_sharing% (self.alias, self._thread_ident, _thread.get_ident())
django.db.utils.DatabaseErrorDatabaseWrapper objects created in a thread can only be used in that same thread. The object with alias 'default' was created in thread id 140439236878864 and this is thread id 140439103430128.

I see that there are some django/utils/asyncio.py errors. I know that Django 3 is now trying to use async features, but how does it affect to celery?

oaosman84 commented 4 years ago

CELERY_TASK_ALWAYS_EAGER = True

I tried this but was still seeing the same error. Also, it's probably not a good idea since it just means "tasks will be executed locally instead of being sent to the queue", which obviously isn't what we want.

Has anyone else been able to solve this? (We're using redis as broker)

I gave up and just ended up using gevent instead of eventlet

thedrow commented 4 years ago

CELERY_TASK_ALWAYS_EAGER = True

I tried this but was still seeing the same error. Also, it's probably not a good idea since it just means "tasks will be executed locally instead of being sent to the queue", which obviously isn't what we want.

Has anyone else been able to solve this? (We're using redis as broker)

I gave up and just ended up using gevent instead of eventlet

We don't have the resources to look into it. gevent is more maintained than eventlet AFAIK.

mzw4 commented 4 years ago

Same issue for me on Celery 4.4.7, Django 3.0.9.

Switching to gevent solved it.

baranbartu commented 4 years ago

Same issue for me on;

Django==3.1.1 celery==4.4.7 eventlet==0.28.0

Switching to gevent(20.9.0) solved it.

aaditya-ridecell commented 1 year ago

To everyone who plans to switch over (or switched over) from Eventlet to Gevent due to Django 3.x incompatibility with Eventlet, please perform a benchmark run with and without the psycopg patch from psycogreen. Eventlet supports Psycopg out of the box, whereas Gevent does not. We noticed a significant performance uplift after the patch compared to the unpatched version - matching the performance of Eventlet to a large extent.

Thanks to everyone for suggesting to transition to Gevent and sharing all the findings. Hat tip to @personalcomputer for the explanation of why this patch works.