celery / celery

Distributed Task Queue (development branch)
https://docs.celeryq.dev
Other
24.95k stars 4.68k forks source link

`rate_limit` limits other tasks in the queue as well #8119

Open erfaan opened 1 year ago

erfaan commented 1 year ago

Checklist

Mandatory Debugging Information

Optional Debugging Information

Related Issues and Possible Duplicates

Related Issues

Possible Duplicates

Environment & Settings

Celery version: 5.2.7 (dawn-chorus)

celery report Output:

``` software -> celery:5.2.7 (dawn-chorus) kombu:5.2.4 py:3.8.9 billiard:3.6.4.0 redis:4.5.1 platform -> system:Darwin arch:64bit kernel version:21.5.0 imp:CPython loader -> celery.loaders.app.AppLoader settings -> transport:redis results:disabled ABSOLUTE_URL_OVERRIDES: { } ADMINS: [] ALLOWED_HOSTS: [] APPEND_SLASH: True AUTHENTICATION_BACKENDS: ['django.contrib.auth.backends.ModelBackend'] AUTH_PASSWORD_VALIDATORS: '********' AUTH_USER_MODEL: 'auth.User' BASE_DIR: '/XXXXX/celery-bug/my_celery_project' CACHES: { 'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}} CACHE_MIDDLEWARE_ALIAS: 'default' CACHE_MIDDLEWARE_KEY_PREFIX: '********' CACHE_MIDDLEWARE_SECONDS: 600 CELERY_BROKER_URL: 'redis://localhost:6379/0' CELERY_TASK_ANNOTATIONS: { 'my_app.tasks.task_a': {'rate_limit': '10/m'}} CSRF_COOKIE_AGE: 31449600 CSRF_COOKIE_DOMAIN: None CSRF_COOKIE_HTTPONLY: False CSRF_COOKIE_NAME: 'csrftoken' CSRF_COOKIE_PATH: '/' CSRF_COOKIE_SAMESITE: 'Lax' CSRF_COOKIE_SECURE: False CSRF_FAILURE_VIEW: 'django.views.csrf.csrf_failure' CSRF_HEADER_NAME: 'HTTP_X_CSRFTOKEN' CSRF_TRUSTED_ORIGINS: [] CSRF_USE_SESSIONS: False DATABASES: { 'default': { 'ATOMIC_REQUESTS': False, 'AUTOCOMMIT': True, 'CONN_MAX_AGE': 0, 'ENGINE': 'django.db.backends.sqlite3', 'HOST': '', 'NAME': '/XXXXXX/my_celery_project/db.sqlite3', 'OPTIONS': {}, 'PASSWORD': '********', 'PORT': '', 'TEST': { 'CHARSET': None, 'COLLATION': None, 'MIRROR': None, 'NAME': None}, 'TIME_ZONE': None, 'USER': ''}} DATABASE_ROUTERS: '********' DATA_UPLOAD_MAX_MEMORY_SIZE: 2621440 DATA_UPLOAD_MAX_NUMBER_FIELDS: 1000 DATETIME_FORMAT: 'N j, Y, P' DATETIME_INPUT_FORMATS: ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M', '%Y-%m-%d', '%m/%d/%Y %H:%M:%S', '%m/%d/%Y %H:%M:%S.%f', '%m/%d/%Y %H:%M', '%m/%d/%Y', '%m/%d/%y %H:%M:%S', '%m/%d/%y %H:%M:%S.%f', '%m/%d/%y %H:%M', '%m/%d/%y'] DATE_FORMAT: 'N j, Y' DATE_INPUT_FORMATS: ['%Y-%m-%d', '%m/%d/%Y', '%m/%d/%y', '%b %d %Y', '%b %d, %Y', '%d %b %Y', '%d %b, %Y', '%B %d %Y', '%B %d, %Y', '%d %B %Y', '%d %B, %Y'] DEBUG: True DEBUG_PROPAGATE_EXCEPTIONS: False DECIMAL_SEPARATOR: '.' DEFAULT_CHARSET: 'utf-8' DEFAULT_CONTENT_TYPE: 'text/html' DEFAULT_EXCEPTION_REPORTER_FILTER: 'django.views.debug.SafeExceptionReporterFilter' DEFAULT_FILE_STORAGE: 'django.core.files.storage.FileSystemStorage' DEFAULT_FROM_EMAIL: 'webmaster@localhost' DEFAULT_INDEX_TABLESPACE: '' DEFAULT_TABLESPACE: '' DISALLOWED_USER_AGENTS: [] EMAIL_BACKEND: 'django.core.mail.backends.smtp.EmailBackend' EMAIL_HOST: 'localhost' EMAIL_HOST_PASSWORD: '********' EMAIL_HOST_USER: '' EMAIL_PORT: 25 EMAIL_SSL_CERTFILE: None EMAIL_SSL_KEYFILE: '********' EMAIL_SUBJECT_PREFIX: '[Django] ' EMAIL_TIMEOUT: None EMAIL_USE_LOCALTIME: False EMAIL_USE_SSL: False EMAIL_USE_TLS: False FILE_CHARSET: 'utf-8' FILE_UPLOAD_DIRECTORY_PERMISSIONS: None FILE_UPLOAD_HANDLERS: ['django.core.files.uploadhandler.MemoryFileUploadHandler', 'django.core.files.uploadhandler.TemporaryFileUploadHandler'] FILE_UPLOAD_MAX_MEMORY_SIZE: 2621440 FILE_UPLOAD_PERMISSIONS: None FILE_UPLOAD_TEMP_DIR: None FIRST_DAY_OF_WEEK: 0 FIXTURE_DIRS: [] FORCE_SCRIPT_NAME: None FORMAT_MODULE_PATH: None FORM_RENDERER: 'django.forms.renderers.DjangoTemplates' IGNORABLE_404_URLS: [] INSTALLED_APPS: ['django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'my_app'] INTERNAL_IPS: [] LANGUAGES: [('af', 'Afrikaans'), ('ar', 'Arabic'), ('ast', 'Asturian'), ('az', 'Azerbaijani'), ('bg', 'Bulgarian'), ('be', 'Belarusian'), ('bn', 'Bengali'), ('br', 'Breton'), ('bs', 'Bosnian'), ('ca', 'Catalan'), ('cs', 'Czech'), ('cy', 'Welsh'), ('da', 'Danish'), ('de', 'German'), ('dsb', 'Lower Sorbian'), ('el', 'Greek'), ('en', 'English'), ('en-au', 'Australian English'), ('en-gb', 'British English'), ('eo', 'Esperanto'), ('es', 'Spanish'), ('es-ar', 'Argentinian Spanish'), ('es-co', 'Colombian Spanish'), ('es-mx', 'Mexican Spanish'), ('es-ni', 'Nicaraguan Spanish'), ('es-ve', 'Venezuelan Spanish'), ('et', 'Estonian'), ('eu', 'Basque'), ('fa', 'Persian'), ('fi', 'Finnish'), ('fr', 'French'), ('fy', 'Frisian'), ('ga', 'Irish'), ('gd', 'Scottish Gaelic'), ('gl', 'Galician'), ('he', 'Hebrew'), ('hi', 'Hindi'), ('hr', 'Croatian'), ('hsb', 'Upper Sorbian'), ('hu', 'Hungarian'), ('hy', 'Armenian'), ('ia', 'Interlingua'), ('id', 'Indonesian'), ('io', 'Ido'), ('is', 'Icelandic'), ('it', 'Italian'), ('ja', 'Japanese'), ('ka', 'Georgian'), ('kab', 'Kabyle'), ('kk', 'Kazakh'), ('km', 'Khmer'), ('kn', 'Kannada'), ('ko', 'Korean'), ('lb', 'Luxembourgish'), ('lt', 'Lithuanian'), ('lv', 'Latvian'), ('mk', 'Macedonian'), ('ml', 'Malayalam'), ('mn', 'Mongolian'), ('mr', 'Marathi'), ('my', 'Burmese'), ('nb', 'Norwegian Bokmål'), ('ne', 'Nepali'), ('nl', 'Dutch'), ('nn', 'Norwegian Nynorsk'), ('os', 'Ossetic'), ('pa', 'Punjabi'), ('pl', 'Polish'), ('pt', 'Portuguese'), ('pt-br', 'Brazilian Portuguese'), ('ro', 'Romanian'), ('ru', 'Russian'), ('sk', 'Slovak'), ('sl', 'Slovenian'), ('sq', 'Albanian'), ('sr', 'Serbian'), ('sr-latn', 'Serbian Latin'), ('sv', 'Swedish'), ('sw', 'Swahili'), ('ta', 'Tamil'), ('te', 'Telugu'), ('th', 'Thai'), ('tr', 'Turkish'), ('tt', 'Tatar'), ('udm', 'Udmurt'), ('uk', 'Ukrainian'), ('ur', 'Urdu'), ('vi', 'Vietnamese'), ('zh-hans', 'Simplified Chinese'), ('zh-hant', 'Traditional Chinese')] LANGUAGES_BIDI: ['he', 'ar', 'fa', 'ur'] LANGUAGE_CODE: 'en-us' LANGUAGE_COOKIE_AGE: None LANGUAGE_COOKIE_DOMAIN: None LANGUAGE_COOKIE_NAME: 'django_language' LANGUAGE_COOKIE_PATH: '/' LOCALE_PATHS: [] LOGGING: { } LOGGING_CONFIG: 'logging.config.dictConfig' LOGIN_REDIRECT_URL: '/accounts/profile/' LOGIN_URL: '/accounts/login/' LOGOUT_REDIRECT_URL: None MANAGERS: [] MEDIA_ROOT: '' MEDIA_URL: '' MESSAGE_STORAGE: 'django.contrib.messages.storage.fallback.FallbackStorage' MIDDLEWARE: ['django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware'] MIGRATION_MODULES: { } MONTH_DAY_FORMAT: 'F j' NUMBER_GROUPING: 0 PASSWORD_HASHERS: '********' PASSWORD_RESET_TIMEOUT_DAYS: '********' PREPEND_WWW: False ROOT_URLCONF: 'my_celery_project.urls' SECRET_KEY: '********' SECURE_BROWSER_XSS_FILTER: False SECURE_CONTENT_TYPE_NOSNIFF: False SECURE_HSTS_INCLUDE_SUBDOMAINS: False SECURE_HSTS_PRELOAD: False SECURE_HSTS_SECONDS: 0 SECURE_PROXY_SSL_HEADER: None SECURE_REDIRECT_EXEMPT: [] SECURE_SSL_HOST: None SECURE_SSL_REDIRECT: False SERVER_EMAIL: 'root@localhost' SESSION_CACHE_ALIAS: 'default' SESSION_COOKIE_AGE: 1209600 SESSION_COOKIE_DOMAIN: None SESSION_COOKIE_HTTPONLY: True SESSION_COOKIE_NAME: 'sessionid' SESSION_COOKIE_PATH: '/' SESSION_COOKIE_SAMESITE: 'Lax' SESSION_COOKIE_SECURE: False SESSION_ENGINE: 'django.contrib.sessions.backends.db' SESSION_EXPIRE_AT_BROWSER_CLOSE: False SESSION_FILE_PATH: None SESSION_SAVE_EVERY_REQUEST: False SESSION_SERIALIZER: 'django.contrib.sessions.serializers.JSONSerializer' SETTINGS_MODULE: 'my_celery_project.settings' SHORT_DATETIME_FORMAT: 'm/d/Y P' SHORT_DATE_FORMAT: 'm/d/Y' SIGNING_BACKEND: 'django.core.signing.TimestampSigner' SILENCED_SYSTEM_CHECKS: [] STATICFILES_DIRS: [] STATICFILES_FINDERS: ['django.contrib.staticfiles.finders.FileSystemFinder', 'django.contrib.staticfiles.finders.AppDirectoriesFinder'] STATICFILES_STORAGE: 'django.contrib.staticfiles.storage.StaticFilesStorage' STATIC_ROOT: None STATIC_URL: '/static/' TEMPLATES: [{'APP_DIRS': True, 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [], 'OPTIONS': {'context_processors': ['django.template.context_processors.debug', 'django.template.context_processors.request', 'django.contrib.auth.context_processors.auth', 'django.contrib.messages.context_processors.messages']}}] TEST_NON_SERIALIZED_APPS: [] TEST_RUNNER: 'django.test.runner.DiscoverRunner' THOUSAND_SEPARATOR: ',' TIME_FORMAT: 'P' TIME_INPUT_FORMATS: ['%H:%M:%S', '%H:%M:%S.%f', '%H:%M'] TIME_ZONE: 'UTC' USE_I18N: True USE_L10N: True USE_THOUSAND_SEPARATOR: False USE_TZ: True USE_X_FORWARDED_HOST: False USE_X_FORWARDED_PORT: False WSGI_APPLICATION: 'my_celery_project.wsgi.application' X_FRAME_OPTIONS: 'SAMEORIGIN' YEAR_MONTH_FORMAT: 'F Y' is_overridden: > deprecated_settings: None ```

Steps to Reproduce

Required Dependencies

Python Packages

pip freeze Output:

``` amqp==5.1.1 async-timeout==4.0.2 billiard==3.6.4.0 celery==5.2.7 click==8.1.3 click-didyoumean==0.3.0 click-plugins==1.1.1 click-repl==0.2.0 Django==2.2.14 future==0.18.3 kombu==5.2.4 prompt-toolkit==3.0.38 pytz==2022.7.1 redis==4.5.1 six==1.16.0 sqlparse==0.4.3 vine==5.0.0 wcwidth==0.2.6 ```

Other Dependencies

N/A

Minimally Reproducible Test Case

Lets say we have a couple of very basic tasks ```python import datetime from celery import shared_task @shared_task def task_a(): print(f"task_a {datetime.datetime.now()}") @shared_task def task_b(): print(f"task_b {datetime.datetime.now()}") ``` and we configure `task_annotations` for one task only: `CELERY_TASK_ANNOTATIONS = {'my_app.tasks.task_a': {'rate_limit': '10/m'}}` and finally we call both tasks few times: ```python for i in range(500): task_a.delay() task_b.delay() ``` This causes the `task_b` to be rate limited as well although it is not configured to be rate limited. I have tried putting both `task_a` and `task_b` to different queues as well but the problem persists. Removing the `task_annotations` configuration altogether makes both tasks execute at normal speed.

Expected Behavior

Tasks not present in task_annotations should not be rate limited.

Actual Behavior

All tasks are getting rate limited irrespective of they are configured to be rate limited via task_annotations or not.

[2023-03-10 18:32:16,424: WARNING/ForkPoolWorker-8] task_b 2023-03-10 18:32:16.423757
[2023-03-10 18:32:16,424: WARNING/ForkPoolWorker-8] task_b 2023-03-10 18:32:16.423757
[2023-03-10 18:32:22,421: WARNING/ForkPoolWorker-8] task_a 2023-03-10 18:32:22.421259
[2023-03-10 18:32:22,427: WARNING/ForkPoolWorker-8] task_b 2023-03-10 18:32:22.426628
[2023-03-10 18:32:28,422: WARNING/ForkPoolWorker-8] task_a 2023-03-10 18:32:28.422477
[2023-03-10 18:32:28,428: WARNING/ForkPoolWorker-8] task_b 2023-03-10 18:32:28.427871
[2023-03-10 18:32:34,426: WARNING/ForkPoolWorker-8] task_a 2023-03-10 18:32:34.425626
[2023-03-10 18:32:34,433: WARNING/ForkPoolWorker-8] task_b 2023-03-10 18:32:34.433257
[2023-03-10 18:32:40,426: WARNING/ForkPoolWorker-8] task_a 2023-03-10 18:32:40.426597
[2023-03-10 18:32:40,431: WARNING/ForkPoolWorker-8] task_b 2023-03-10 18:32:40.431649
[2023-03-10 18:32:46,429: WARNING/ForkPoolWorker-8] task_a 2023-03-10 18:32:46.428811
[2023-03-10 18:32:46,432: WARNING/ForkPoolWorker-8] task_b 2023-03-10 18:32:46.432359
open-collective-bot[bot] commented 1 year ago

Hey @erfaan :wave:, Thank you for opening an issue. We will get back to you as soon as we can. Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors. If you require immediate assistance please consider sponsoring us.

glumia commented 1 year ago

+1, it seems this happens because the worker will not fetch any new tasks when its buffer controlled with --prefetch-multiplier is full, even if all the tasks it currently has can't be processed because of the rate limit.

auvipy commented 1 year ago

were you able to find any solution/fix?

glumia commented 1 year ago

No, unfortunately, I didn't have a chance to further look into it, so far the precaution we're taking is to avoid using the rate_limit parameter on tasks that run on workers that may process also others.

RJPercival commented 1 year ago

I raised this issue about a year ago: https://github.com/celery/celery/discussions/7803. I think the solution is to increment the prefetch limit, as is done when a task with a future ETA is received.

RJPercival commented 1 year ago

@auvipy, can I interpret you self-assigning this ticket to mean you're going to look into this? It'd be awesome to get this fixed 😁

auvipy commented 1 year ago

I will try with the help of other team members. lets see how it end up

dbartenstein commented 11 months ago

I will try with the help of other team members. lets see how it end up

Hi @auvipy, we also run into this issue. @RJPercival’s idea of of incrementing the prefetch limit sounds interesting.

RJPercival commented 10 months ago

@auvipy, as a sponsor, could I please request priority support on this? It's been over a year now with no progress.

auvipy commented 10 months ago

@Nusnus would you mind having a look into it? When you have time please?

Nusnus commented 10 months ago

@Nusnus would you mind having a look into it? When you have time please?

My highest priority at this moment is the upcoming pytest-celery v1.0.0 release and the following Celery v5.4.0 release, so I am fully booked for the near future 🙏

@auvipy, as a sponsor, could I please request priority support on this? It's been over a year now with no progress.

Noted. I’ll add it to the patch release pipeline for v5.4.

RJPercival commented 6 days ago

No progress on this?

RJPercival commented 6 days ago

If it helps move this along, I think the problem is that Consumer._limit_task() calls _schedule_bucket_request() which, if rate-limiting kicks in, schedules a call back to itself with a delay. https://github.com/celery/celery/blob/1b35d1d5966614ce36af75808ee21b0d2db6745d/celery/worker/consumer/consumer.py#L316-L318 https://github.com/celery/celery/blob/1b35d1d5966614ce36af75808ee21b0d2db6745d/celery/worker/consumer/consumer.py#L309-L312 Unlike with ETA-based delays, self.qos isn't incremented in this case, which leaves the consumer unable to fetch more tasks to execute until the rate-limiting subsides. https://github.com/celery/celery/blob/1b35d1d5966614ce36af75808ee21b0d2db6745d/celery/worker/strategy.py#L197-L202

RJPercival commented 6 days ago

Ideally, you'd expect rate-limiting to reuse the ETA logic, as a rate-limited task is effectively just one that is scheduled for execution at a future time when it is expected the rate-limiting will have subsided (as indicated by the hold variable).