celery / django-celery

Old Celery integration project for Django
http://celery.github.com/django-celery
BSD 3-Clause "New" or "Revised" License
1.53k stars 452 forks source link

Worker freezes after multiple revokes (first revoke terminates) #517

Open nmgeek opened 7 years ago

nmgeek commented 7 years ago

A MySQL OperationalError is thrown when djcelery is updating the database and nothing catches it.

$ pip freeze | egrep 'celery|Django|SQL'
celery==3.1.25
Django==1.8.5
django-celery==3.2.1
MySQL-python==1.2.5

This is not a timeout problem. I can reproduce it within 10 minutes of starting the worker. The conditions to reproduce are:

I don't know why the database connection gets broken but this is a common theme with djcelery. There was exception handling code added recently to schedulers.py and loaders.py But this traceback unwinds through backends/database.py so I think there should be a similar try/catch there.

In my app we worked around this problem by defining a revoke handler for every task and calling django.db.connection.close() from the revoke handler. The database update for the terminated task, the first revoked task in the sequence, works so closing the connection afterwards seems to clean up the broken connection problem.

Traceback (most recent call last):
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/bootsteps.py", line 374, in start
    return self.obj.start()
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 280, in start
    blueprint.start(self)
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 884, in start
    c.loop(*c.loop_args())
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/worker/loops.py", line 76, in asynloop
    next(loop)
  File "/mnt/virt/local/lib/python2.7/site-packages/kombu/async/hub.py", line 279, in create_loop
    item()
  File "/mnt/virt/local/lib/python2.7/site-packages/amqp/utils.py", line 42, in __call__
    self.set_error_state(exc)
  File "/mnt/virt/local/lib/python2.7/site-packages/amqp/utils.py", line 39, in __call__
    **dict(self.kwargs, **kwargs) if self.kwargs else kwargs
  File "/mnt/virt/local/lib/python2.7/site-packages/kombu/connection.py", line 288, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/mnt/virt/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 95, in drain_events
    return connection.drain_events(**kwargs)
  File "/mnt/virt/local/lib/python2.7/site-packages/amqp/connection.py", line 326, in drain_events
    return amqp_method(channel, args, content)
  File "/mnt/virt/local/lib/python2.7/site-packages/amqp/channel.py", line 1909, in _basic_deliver
    fun(msg)
  File "/mnt/virt/local/lib/python2.7/site-packages/kombu/messaging.py", line 598, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/mnt/virt/local/lib/python2.7/site-packages/kombu/messaging.py", line 564, in receive
    [callback(body, message) for callback in callbacks]
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 468, in on_task_received
    callbacks)
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/worker/strategy.py", line 56, in task_message_handler
    if req.revoked():
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/worker/job.py", line 338, in revoked
    'expired' if expired else 'revoked', False, None, expired,
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/worker/job.py", line 322, in _announce_revoked
    self.task.backend.mark_as_revoked(self.id, reason, request=self)
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/backends/base.py", line 169, in mark_as_revoked
    request=request)
  File "/mnt/virt/local/lib/python2.7/site-packages/celery/backends/base.py", line 271, in store_result
    request=request, **kwargs)
  File "/mnt/virt/local/lib/python2.7/site-packages/djcelery/backends/database.py", line 29, in _store_result
    traceback=traceback, children=self.current_task_children(request),
  File "/mnt/virt/local/lib/python2.7/site-packages/djcelery/managers.py", line 42, in _inner
    return fun(*args, **kwargs)
  File "/mnt/virt/local/lib/python2.7/site-packages/djcelery/managers.py", line 181, in store_result
    'meta': {'children': children}})
  File "/mnt/virt/local/lib/python2.7/site-packages/djcelery/managers.py", line 87, in update_or_create
    return get_queryset(self).update_or_create(**kwargs)
  File "/mnt/virt/local/lib/python2.7/site-packages/djcelery/managers.py", line 70, in update_or_create
    obj, created = self.get_or_create(**kwargs)
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/models/query.py", line 405, in get_or_create
    return self.get(**lookup), False
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/models/query.py", line 328, in get
    num = len(clone)
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/models/query.py", line 144, in __len__
    self._fetch_all()
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/models/query.py", line 965, in _fetch_all
    self._result_cache = list(self.iterator())
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/models/query.py", line 238, in iterator
    results = compiler.execute_sql()
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 840, in execute_sql
    cursor.execute(sql, params)
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/backends/utils.py", line 79, in execute
    return super(CursorDebugWrapper, self).execute(sql, params)
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/backends/utils.py", line 64, in execute
    return self.cursor.execute(sql, params)
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/utils.py", line 97, in __exit__
    six.reraise(dj_exc_type, dj_exc_value, traceback)
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/backends/utils.py", line 64, in execute
    return self.cursor.execute(sql, params)
  File "/mnt/virt/local/lib/python2.7/site-packages/django/db/backends/mysql/base.py", line 124, in execute
    return self.cursor.execute(query, args)
  File "/mnt/virt/local/lib/python2.7/site-packages/MySQLdb/cursors.py", line 205, in execute
    self.errorhandler(self, exc, value)
  File "/mnt/virt/local/lib/python2.7/site-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
    raise errorclass, errorvalue
OperationalError: (2006, 'MySQL server has gone away')
nmgeek commented 7 years ago

I noticed that the SQLAlchemy backend wraps calls like Database._store_result() in a retry decorator which catches exceptions like this and tries again. That would fix this problem would it not?

See https://github.com/celery/celery/blob/d90caee6d91a0fcc91756329503a35bf8fef720a/celery/backends/database/__init__.py#L45

Here is a theory as to why the db connection gets broken in the first place. This is conjecture. (If you could find the root cause then no backend, even SQLAlchemy would need the less-than-optimal retry fix.)

And why would the parent and child process both be disconnected? It could be that when the child process forked it got a copy of the same db connection. If that connection is closed in the child it would no longer be valid in the parent??

I do see that the child process db connections are closed using a hook called on_worker_process_init in the backend loader class. But that would be in the child process and I believe our traceback is coming from the parent process.

At https://github.com/celery/django-celery/blob/3.1/djcelery/loaders.py#L150 you will find some hackery which may lead to the db connections being closed without proper book keeping at the Django db connection level. But I think that is in the child process.