pypi / warehouse

The Python Package Index
https://pypi.org
Apache License 2.0
3.54k stars 952 forks source link

Celerybeat Cleanup Error #3492

Open dstufft opened 6 years ago

dstufft commented 6 years ago
worker_1         | Traceback (most recent call last):
worker_1         |   File "/opt/warehouse/lib/python3.6/site-packages/celery/beat.py", line 320, in apply_async
worker_1         |     **entry.options)
worker_1         |   File "/opt/warehouse/src/warehouse/tasks.py", line 95, in apply_async
worker_1         |     return super().apply_async(*args, **kwargs)
worker_1         |   File "/opt/warehouse/lib/python3.6/site-packages/celery/app/task.py", line 518, in apply_async
worker_1         |     check_arguments(*(args or ()), **(kwargs or {}))
worker_1         | TypeError: backend_cleanup() takes 0 positional arguments but 1 was given
worker_1         |
worker_1         | During handling of the above exception, another exception occurred:
worker_1         |
worker_1         | Traceback (most recent call last):
worker_1         |   File "/opt/warehouse/lib/python3.6/site-packages/celery/beat.py", line 222, in apply_entry
worker_1         |     result = self.apply_async(entry, producer=producer, advance=False)
worker_1         |   File "/opt/warehouse/lib/python3.6/site-packages/celery/beat.py", line 328, in apply_async
worker_1         |     entry, exc=exc)), sys.exc_info()[2])
worker_1         |   File "/opt/warehouse/lib/python3.6/site-packages/vine/five.py", line 178, in reraise
worker_1         |     raise value.with_traceback(tb)
worker_1         |   File "/opt/warehouse/lib/python3.6/site-packages/celery/beat.py", line 320, in apply_async
worker_1         |     **entry.options)
worker_1         |   File "/opt/warehouse/src/warehouse/tasks.py", line 95, in apply_async
worker_1         |     return super().apply_async(*args, **kwargs)
worker_1         |   File "/opt/warehouse/lib/python3.6/site-packages/celery/app/task.py", line 518, in apply_async
worker_1         |     check_arguments(*(args or ()), **(kwargs or {}))
worker_1         | celery.beat.SchedulingError: Couldn't apply scheduled task celery.backend_cleanup: backend_cleanup() takes 0 positional arguments but 1 was given

I assume this is an integration issue with how the warehouse <-> celery integration happens. Not sure what it's actual affect is though.

auvipy commented 6 years ago

may I know which version of celery is throughing this?

eevelweezel commented 6 years ago

I encountered this while building the local dev environment, looks like it's using celery 4.1.0. It appears like apply_async is getting called with an arg, which it's passing on to backend_cleanup. When I run the app locally, however, it looks like results are disabled. Should they be running?

worker_1 | -------------- celery@b7560aaf9e97 v4.1.0 (latentcall) worker_1 | ---- * ----- worker_1 | --- * -- Linux-3.16.0-4-amd64-x86_64-with-debian-9.3 2018-05-15 20:27:24 worker_1 | -- - ** --- worker_1 | - ---------- [config] worker_1 | - ---------- .> app: warehouse:0x7f0fbe02dac8 worker_1 | - ---------- .> transport: amqp://guest:@rabbitmq:5672// worker_1 | - ---------- .> results: disabled:// worker_1 | - --- --- .> concurrency: 4 (prefork) worker_1 | -- **** ---- .> task events: OFF (enable -E to monitor tasks in this worker) worker_1 | --- ----- worker_1 | -------------- [queues] worker_1 | .> celery exchange=celery(direct) key=celery worker_1 |
worker_1 | worker_1 | [tasks] worker_1 | . warehouse.cache.origin.fastly.purge_key worker_1 | . warehouse.email.send_email worker_1 | . warehouse.email.ses.tasks.cleanup worker_1 | . warehouse.legacy.api.xmlrpc.cache.services.purge_tag worker_1 | . warehouse.packaging.tasks.compute_trending worker_1 | . warehouse.search.tasks.reindex worker_1 | . warehouse.utils.project.remove_documentation

auvipy commented 6 years ago

i will try to check warehouse with celery 4.2.x soon

auvipy commented 4 years ago

may I know the current update of this issue?

callmecampos commented 4 years ago

Potentially unrelated, but I often see the following error with celery that blocks me from accessing localhost:

worker_1         | [2020-06-23 17:45:29,510: INFO/MainProcess] Found credentials in environment variables.
worker_1         | [2020-06-23 17:45:29,577: INFO/MainProcess] Connected to sqs://localstack:4576//
worker_1         | [2020-06-23 17:45:29,599: INFO/MainProcess] Found credentials in environment variables.
worker_1         | [2020-06-23 17:45:29,708: INFO/MainProcess] celery@0d90aa0b13cf ready.
worker_1         | [2020-06-23 17:45:31,539: INFO/Beat] beat: Starting...
worker_1         | [2020-06-23 17:45:31,541: ERROR/Beat] Removing corrupted schedule file 'celerybeat-schedule': error(11, 'Resource temporarily unavailable')
worker_1         | Traceback (most recent call last):
worker_1         |   File "/opt/warehouse/lib/python3.8/site-packages/kombu/utils/objects.py", line 42, in __get__
worker_1         |     return obj.__dict__[self.__name__]
worker_1         | KeyError: 'scheduler'
worker_1         |
worker_1         | During handling of the above exception, another exception occurred:
worker_1         |
worker_1         | Traceback (most recent call last):
worker_1         |   File "/opt/warehouse/lib/python3.8/site-packages/celery/beat.py", line 519, in setup_schedule
worker_1         |     self._store = self._open_schedule()
worker_1         |   File "/opt/warehouse/lib/python3.8/site-packages/celery/beat.py", line 509, in _open_schedule
worker_1         |     return self.persistence.open(self.schedule_filename, writeback=True)
worker_1         |   File "/usr/local/lib/python3.8/shelve.py", line 243, in open
worker_1         |     return DbfilenameShelf(filename, flag, protocol, writeback)
worker_1         |   File "/usr/local/lib/python3.8/shelve.py", line 227, in __init__
worker_1         |     Shelf.__init__(self, dbm.open(filename, flag), protocol, writeback)
worker_1         |   File "/usr/local/lib/python3.8/dbm/__init__.py", line 95, in open
worker_1         |     return mod.open(file, flag, mode)
worker_1         | _gdbm.error: [Errno 11] Resource temporarily unavailable: 'celerybeat-schedule'
di commented 4 years ago

This is due to these lines:

https://github.com/pypa/warehouse/blob/0c9ffd5ccb2171dd6141e5cf69409df3249ad805/warehouse/tasks.py#L54-L55

Celery is using obj.__header__ to check arguments: https://github.com/celery/celery/blob/bf6139bf651b20bc04b895a5f6eb8d50320bc252/celery/app/task.py#L524-L529

Normally this would be something like:

>>> from celery import task
>>> args = []
>>> kwargs = {}
>>> check_arguments = task.backend_cleanup.__header__
>>> check_arguments(*(args or ()), **(kwargs or {}))
1

But by overriding __header__ with a partial, it's this:

>>> from celery import task
>>> import functools
>>> args = []
>>> kwargs = {}
>>> check_arguments = functools.partial(task.backend_cleanup.__header__, object())
>>> check_arguments(*(args or ()), **(kwargs or {}))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: backend_cleanup() takes 0 positional arguments but 1 was given

It's not clear to me why we're doing this, @dstufft any ideas?

dstufft commented 4 years ago

I have no memory of why that is, and looking into the PR doesn't bring back any memories, sorry.

di commented 4 years ago

Turns out the rest of our tasks rely on this argument existing, for example:

TypeError: purge_key() missing 1 required positional argument: 'key'
  File "transaction/_transaction.py", line 376, in _call_hooks
    hook(*(prefix_args + args), **kws)
  File "warehouse/tasks.py", line 121, in _after_commit_hook
    super().apply_async(*args, **kwargs)
  File "celery/app/task.py", line 529, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))