celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.86k stars 927 forks source link

Celery Worker fails to properly handle loss of SQS connection; fails to process any further messages. #931

Closed rthille closed 5 years ago

rthille commented 5 years ago

Filing this in Kombu, because I think the fix should be here, not in Celery, but I started creating the issue in Celery, so I've attached the info requested for issues there.

Checklist

Steps to reproduce

Have long-running Celery App with SQS connection with Celery 4.2.1 & Kombu 4.2.1 Have AWS SQS have an issue an prematurely close the connection at the wrong time. Similar to these issues: https://github.com/celery/kombu/issues/796 and https://github.com/celery/celery/issues/3990 (which I commented on), however the process did not appear to attempt to re-establish the connection.

Expected behavior

Celery app handles exception and reconnects to SQS or exits (and is restarted by external systems).

Actual behavior

Exception is logged, but no further SQS messages are processed by the celery app, and no further logs were produced until another developer kill -HUP'd the main worker process. Here's the traceback logged:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/hub.py", line 136, in fire_timers
    entry()
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/timer.py", line 68, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/timer.py", line 127, in _reschedules
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/http/curl.py", line 108, in _timeout_check
    self._process_pending_requests()
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/http/curl.py", line 132, in _process_pending_requests
    self._process(curl, errno, reason)
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/http/curl.py", line 178, in _process
    buffer=buffer, effective_url=effective_url, error=error,
  File "/usr/local/lib/python3.7/dist-packages/vine/promises.py", line 150, in __call__
    svpending(*ca, **ck)
  File "/usr/local/lib/python3.7/dist-packages/vine/promises.py", line 143, in __call__
    return self.throw()
  File "/usr/local/lib/python3.7/dist-packages/vine/promises.py", line 140, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/usr/local/lib/python3.7/dist-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/usr/local/lib/python3.7/dist-packages/vine/promises.py", line 143, in __call__
    return self.throw()
  File "/usr/local/lib/python3.7/dist-packages/vine/promises.py", line 140, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/usr/local/lib/python3.7/dist-packages/vine/funtools.py", line 98, in _transback
    callback.throw()
  File "/usr/local/lib/python3.7/dist-packages/vine/funtools.py", line 96, in _transback
    ret = filter_(*args + (ret,), **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/aws/connection.py", line 233, in _on_list_ready
    raise self._for_status(response, response.read())
Exception: Request Empty body  HTTP 599  TCP connection reset by peer (None)

Attempting to reproduce the error with 'tcpkill' results in a different traceback, which makes it all the way up to the top level and results in a process exit:

[2018-10-16 20:43:12,666: CRITICAL/MainProcess] Unrecoverable error: Exception('Request Empty body  HTTP 599  TCP connection reset by peer (None)')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.7/dist-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.7/dist-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/usr/local/lib/python3.7/dist-packages/celery/worker/consumer/consumer.py", line 317, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.7/dist-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.7/dist-packages/celery/worker/consumer/consumer.py", line 593, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.7/dist-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/hub.py", line 354, in create_loop
    cb(*cbargs)
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/http/curl.py", line 111, in on_readable
    return self._on_event(fd, _pycurl.CSELECT_IN)
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/http/curl.py", line 124, in _on_event
    self._process_pending_requests()
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/http/curl.py", line 132, in _process_pending_requests
    self._process(curl, errno, reason)
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/http/curl.py", line 178, in _process
    buffer=buffer, effective_url=effective_url, error=error,
  File "/usr/local/lib/python3.7/dist-packages/vine/promises.py", line 150, in __call__
    svpending(*ca, **ck)
  File "/usr/local/lib/python3.7/dist-packages/vine/promises.py", line 143, in __call__
    return self.throw()
  File "/usr/local/lib/python3.7/dist-packages/vine/promises.py", line 140, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/usr/local/lib/python3.7/dist-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/usr/local/lib/python3.7/dist-packages/vine/promises.py", line 143, in __call__
    return self.throw()
  File "/usr/local/lib/python3.7/dist-packages/vine/promises.py", line 140, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/usr/local/lib/python3.7/dist-packages/vine/funtools.py", line 98, in _transback
    callback.throw()
  File "/usr/local/lib/python3.7/dist-packages/vine/funtools.py", line 96, in _transback
    ret = filter_(*args + (ret,), **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/kombu/asynchronous/aws/connection.py", line 233, in _on_list_ready
    raise self._for_status(response, response.read())
Exception: Request Empty body  HTTP 599  TCP connection reset by peer (None)

It seems that Kombu's asynchronous/aws/connection.py:_for_status function should raise ConnectionError on response.status == 599, rather than a generic Exception, and hub.py should handle ConnectionError by raising. I've got a patch against the 4.2.1 tag:

bash-3.2$ git diff
diff --git a/kombu/asynchronous/aws/connection.py b/kombu/asynchronous/aws/connection.py
index df257be3..e2f10368 100644
--- a/kombu/asynchronous/aws/connection.py
+++ b/kombu/asynchronous/aws/connection.py
@@ -7,6 +7,7 @@ from vine import promise, transform
 from kombu.asynchronous.aws.ext import AWSRequest, get_response

 from kombu.asynchronous.http import Headers, Request, get_client
+from kombu.exceptions import ConnectionError
 from kombu.five import items, python_2_unicode_compatible

 import io
@@ -254,6 +255,10 @@ class AsyncAWSQueryConnection(AsyncConnection):

     def _for_status(self, response, body):
         context = 'Empty body' if not body else 'HTTP Error'
+        if response.status == 599:
+            return ConnectionError("Request {}  HTTP {}  {} ({})".format(
+            context, response.status, response.reason, body
+        ))
         return Exception("Request {}  HTTP {}  {} ({})".format(
             context, response.status, response.reason, body
         ))
diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py
index d2f8b940..85fdbd06 100644
--- a/kombu/asynchronous/hub.py
+++ b/kombu/asynchronous/hub.py
@@ -8,6 +8,7 @@ from contextlib import contextmanager
 from time import sleep
 from types import GeneratorType as generator  # noqa

+from kombu.exceptions import ConnectionError
 from kombu.five import Empty, python_2_unicode_compatible, range
 from kombu.log import get_logger
 from kombu.utils.compat import fileno
@@ -142,6 +143,8 @@ class Hub(object):
                     if exc.errno == errno.ENOMEM:
                         raise
                     logger.error('Error in timer: %r', exc, exc_info=1)
+                except ConnectionError:
+                    raise
                 except Exception as exc:
                     logger.error('Error in timer: %r', exc, exc_info=1)
         return min(delay or min_delay, max_delay)

but I haven't been able to reliably reproduce, so I'm not sure this is a good fix or not.

Celery Report output (company name and a few other things replaced with REDACTED):

celery -A REDACTEDlib.queueing report

software -> celery:4.2.1 (windowlicker) kombu:4.2.1 py:3.7.0
            billiard:3.5.0.4 sqs:N/A
platform -> system:Linux arch:64bit, ELF imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:sqs results:django-db

ABSOLUTE_URL_OVERRIDES: {
 }
ADMINS: []
ALLOWED_HOSTS: ['integrator', '.REDACTED.com']
APPEND_SLASH: True
AUTHENTICATION_BACKENDS: ['django.contrib.auth.backends.ModelBackend']
AUTH_PASSWORD_VALIDATORS: '********'
AUTH_USER_MODEL: 'auth.User'
AWS_LOCATION: 'static/integrator'
AWS_S3_OBJECT_PARAMETERS: {
 'CacheControl': 'max-age=0,must-revalidate,public,proxy-revalidate'}
AWS_STORAGE_BUCKET_NAME: 'REDACTED'
BASE_DIR: '/root/integrator'
CACHES: {
 'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}}
CACHE_MIDDLEWARE_ALIAS: 'default'
CACHE_MIDDLEWARE_KEY_PREFIX: '********'
CACHE_MIDDLEWARE_SECONDS: 600
REDACTED_APIS: '********'
REDACTED_APPNAME: 'integrator'
REDACTED_DATETIME_FORMAT: '%Y-%m-%dT%H:%M:%S%z'
REDACTED_DATETIME_FORMAT_UTC: '%Y-%m-%d %H:%M:%S'
REDACTED_DATE_FORMAT: '%Y-%m-%d'
REDACTED_DISABLE_AUTH: False
REDACTED_DISABLE_AUTHORIZATION: False
REDACTED_FILE_UPLOAD_MAX_FILE_SIZE: 52428800
REDACTED_IN_TEST: False
CELERY_RESULT_BACKEND: 'django-db'
CORS_ORIGIN_ALLOW_ALL: True
CSRF_COOKIE_AGE: 31449600
CSRF_COOKIE_DOMAIN: None
CSRF_COOKIE_HTTPONLY: False
CSRF_COOKIE_NAME: 'csrftoken'
CSRF_COOKIE_PATH: '/'
CSRF_COOKIE_SAMESITE: 'Lax'
CSRF_COOKIE_SECURE: False
CSRF_FAILURE_VIEW: 'django.views.csrf.csrf_failure'
CSRF_HEADER_NAME: 'HTTP_X_CSRFTOKEN'
CSRF_TRUSTED_ORIGINS: []
CSRF_USE_SESSIONS: False
DATABASES: {
    'default': {   'ATOMIC_REQUESTS': False,
                   'AUTOCOMMIT': True,
                   'CONN_MAX_AGE': 0,
                   'ENGINE': 'django.db.backends.mysql',
                   'HOST': 'REDACTED',
                   'NAME': 'integrator',
                   'OPTIONS': {},
                   'PASSWORD': '********',
                   'PORT': '',
                   'TEST': {   'CHARSET': None,
                               'COLLATION': None,
                               'MIRROR': None,
                               'NAME': None},
                   'TIME_ZONE': None,
                   'USER': 'integrator'}}
DATABASE_ROUTERS: '********'
DATA_UPLOAD_MAX_MEMORY_SIZE: 2621440
DATA_UPLOAD_MAX_NUMBER_FIELDS: 1000
DATETIME_FORMAT: 'N j, Y, P'
DATETIME_INPUT_FORMATS: ['%Y-%m-%d %H:%M:%S',
 '%Y-%m-%d %H:%M:%S.%f',
 '%Y-%m-%d %H:%M',
 '%Y-%m-%d',
 '%m/%d/%Y %H:%M:%S',
 '%m/%d/%Y %H:%M:%S.%f',
 '%m/%d/%Y %H:%M',
 '%m/%d/%Y',
 '%m/%d/%y %H:%M:%S',
 '%m/%d/%y %H:%M:%S.%f',
 '%m/%d/%y %H:%M',
 '%m/%d/%y']
DATE_FORMAT: 'N j, Y'
DATE_INPUT_FORMATS: ['%Y-%m-%d',
 '%m/%d/%Y',
 '%m/%d/%y',
 '%b %d %Y',
 '%b %d, %Y',
 '%d %b %Y',
 '%d %b, %Y',
 '%B %d %Y',
 '%B %d, %Y',
 '%d %B %Y',
 '%d %B, %Y']
DEBUG: False
DEBUG_PROPAGATE_EXCEPTIONS: False
DECIMAL_SEPARATOR: '.'
DEFAULT_CHARSET: 'utf-8'
DEFAULT_CONTENT_TYPE: 'application/vnd.api+json'
DEFAULT_EXCEPTION_REPORTER_FILTER: 'django.views.debug.SafeExceptionReporterFilter'
DEFAULT_FILE_STORAGE: 'django.core.files.storage.FileSystemStorage'
DEFAULT_FROM_EMAIL: 'webmaster@localhost'
DEFAULT_INDEX_TABLESPACE: ''
DEFAULT_TABLESPACE: ''
DISALLOWED_USER_AGENTS: []
EMAIL_BACKEND: 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST: 'localhost'
EMAIL_HOST_PASSWORD: '********'
EMAIL_HOST_USER: ''
EMAIL_PORT: 25
EMAIL_SSL_CERTFILE: None
EMAIL_SSL_KEYFILE: '********'
EMAIL_SUBJECT_PREFIX: '[Django] '
EMAIL_TIMEOUT: None
EMAIL_USE_LOCALTIME: False
EMAIL_USE_SSL: False
EMAIL_USE_TLS: False
FILE_CHARSET: 'utf-8'
FILE_UPLOAD_DIRECTORY_PERMISSIONS: None
FILE_UPLOAD_HANDLERS: ['django.core.files.uploadhandler.MemoryFileUploadHandler',
 'REDACTEDlib.djangolib.files.LimitedTemporaryFileUploadHandler']
FILE_UPLOAD_MAX_MEMORY_SIZE: 2621440
FILE_UPLOAD_PERMISSIONS: None
FILE_UPLOAD_TEMP_DIR: None
FIRST_DAY_OF_WEEK: 0
FIXTURE_DIRS: []
FORCE_SCRIPT_NAME: None
FORMAT_MODULE_PATH: None
FORM_RENDERER: 'django.forms.renderers.DjangoTemplates'
IGNORABLE_404_URLS: []
INSTALLED_APPS: ['REDACTEDlib.djangolib',
 'REDACTEDlib.runtime.probes.appdefs.HealthProbesApp',
 'django.contrib.auth',
 'django.contrib.contenttypes',
 'django.contrib.sessions',
 'django.contrib.messages',
 'django.contrib.staticfiles',
 'django_celery_beat',
 'django_celery_results',
 'corsheaders',
 'storages',
 'integrator.collector']
INTERNAL_IPS: []
LANGUAGES: [('af', 'Afrikaans'),
 ('ar', 'Arabic'),
 ('ast', 'Asturian'),
 ('az', 'Azerbaijani'),
 ('bg', 'Bulgarian'),
 ('be', 'Belarusian'),
 ('bn', 'Bengali'),
 ('br', 'Breton'),
 ('bs', 'Bosnian'),
 ('ca', 'Catalan'),
 ('cs', 'Czech'),
 ('cy', 'Welsh'),
 ('da', 'Danish'),
 ('de', 'German'),
 ('dsb', 'Lower Sorbian'),
 ('el', 'Greek'),
 ('en', 'English'),
 ('en-au', 'Australian English'),
 ('en-gb', 'British English'),
 ('eo', 'Esperanto'),
 ('es', 'Spanish'),
 ('es-ar', 'Argentinian Spanish'),
 ('es-co', 'Colombian Spanish'),
 ('es-mx', 'Mexican Spanish'),
 ('es-ni', 'Nicaraguan Spanish'),
 ('es-ve', 'Venezuelan Spanish'),
 ('et', 'Estonian'),
 ('eu', 'Basque'),
 ('fa', 'Persian'),
 ('fi', 'Finnish'),
 ('fr', 'French'),
 ('fy', 'Frisian'),
 ('ga', 'Irish'),
 ('gd', 'Scottish Gaelic'),
 ('gl', 'Galician'),
 ('he', 'Hebrew'),
 ('hi', 'Hindi'),
 ('hr', 'Croatian'),
 ('hsb', 'Upper Sorbian'),
 ('hu', 'Hungarian'),
 ('ia', 'Interlingua'),
 ('id', 'Indonesian'),
 ('io', 'Ido'),
 ('is', 'Icelandic'),
 ('it', 'Italian'),
 ('ja', 'Japanese'),
 ('ka', 'Georgian'),
 ('kab', 'Kabyle'),
 ('kk', 'Kazakh'),
 ('km', 'Khmer'),
 ('kn', 'Kannada'),
 ('ko', 'Korean'),
 ('lb', 'Luxembourgish'),
 ('lt', 'Lithuanian'),
 ('lv', 'Latvian'),
 ('mk', 'Macedonian'),
 ('ml', 'Malayalam'),
 ('mn', 'Mongolian'),
 ('mr', 'Marathi'),
 ('my', 'Burmese'),
 ('nb', 'Norwegian Bokmål'),
 ('ne', 'Nepali'),
 ('nl', 'Dutch'),
 ('nn', 'Norwegian Nynorsk'),
 ('os', 'Ossetic'),
 ('pa', 'Punjabi'),
 ('pl', 'Polish'),
 ('pt', 'Portuguese'),
 ('pt-br', 'Brazilian Portuguese'),
 ('ro', 'Romanian'),
 ('ru', 'Russian'),
 ('sk', 'Slovak'),
 ('sl', 'Slovenian'),
 ('sq', 'Albanian'),
 ('sr', 'Serbian'),
 ('sr-latn', 'Serbian Latin'),
 ('sv', 'Swedish'),
 ('sw', 'Swahili'),
 ('ta', 'Tamil'),
 ('te', 'Telugu'),
 ('th', 'Thai'),
 ('tr', 'Turkish'),
 ('tt', 'Tatar'),
 ('udm', 'Udmurt'),
 ('uk', 'Ukrainian'),
 ('ur', 'Urdu'),
 ('vi', 'Vietnamese'),
 ('zh-hans', 'Simplified Chinese'),
 ('zh-hant', 'Traditional Chinese')]
LANGUAGES_BIDI: ['he', 'ar', 'fa', 'ur']
LANGUAGE_CODE: 'en-us'
LANGUAGE_COOKIE_AGE: None
LANGUAGE_COOKIE_DOMAIN: None
LANGUAGE_COOKIE_NAME: 'django_language'
LANGUAGE_COOKIE_PATH: '/'
LOCALE_PATHS: []
LOGGING: {
    'disable_existing_loggers': False,
    'formatters': {'REDACTED': {'class': 'REDACTEDlib.logutils.REDACTEDFormatter'}},
    'handlers': {   'console': {   'class': 'logging.StreamHandler',
                                   'formatter': 'REDACTED'}},
    'loggers': {   '': {   'handlers': ['console'],
                           'level': 'INFO',
                           'propagate': True},
                   'botocore': {   'handlers': ['console'],
                                   'level': 'ERROR',
                                   'propagate': True},
                   'celery.app.trace': {'level': 'ERROR'},
                   'ddtrace': {'level': 'ERROR', 'propagate': False},
                   'django': {'level': 'ERROR'},
                   'django.db.backends': {   'handlers': ['console'],
                                             'level': 'INFO'}},
    'version': 1}
LOGGING_CONFIG: 'logging.config.dictConfig'
LOGIN_REDIRECT_URL: '/accounts/profile/'
LOGIN_URL: '/accounts/login/'
LOGOUT_REDIRECT_URL: None
MANAGERS: []
MEDIA_ROOT: ''
MEDIA_URL: ''
MESSAGE_STORAGE: 'django.contrib.messages.storage.fallback.FallbackStorage'
MIDDLEWARE: ['REDACTEDlib.logutils.middleware.RequestContextMiddleware',
 'REDACTEDlib.logutils.middleware.LoggingMiddleware',
 'REDACTEDlib.djangolib.metrics.middleware.DatadogMiddleware',
 'corsheaders.middleware.CorsMiddleware',
 'django.middleware.security.SecurityMiddleware',
 'django.contrib.sessions.middleware.SessionMiddleware',
 'django.middleware.common.CommonMiddleware',
 'REDACTEDlib.djangolib.authentication.middleware.BearerTokenMiddleware',
 'REDACTEDlib.djangolib.authorization.middleware.AuthorizationMiddleware',
 'django.contrib.messages.middleware.MessageMiddleware',
 'django.middleware.clickjacking.XFrameOptionsMiddleware']
MIDDLEWARE_WITH_AUTHORIZATION2: ['REDACTEDlib.logutils.middleware.RequestContextMiddleware',
 'REDACTEDlib.logutils.middleware.LoggingMiddleware',
 'REDACTEDlib.djangolib.metrics.middleware.DatadogMiddleware',
 'corsheaders.middleware.CorsMiddleware',
 'django.middleware.security.SecurityMiddleware',
 'django.contrib.sessions.middleware.SessionMiddleware',
 'django.middleware.common.CommonMiddleware',
 'REDACTEDlib.djangolib.authentication.middleware.BearerTokenMiddleware',
 'REDACTEDlib.djangolib.authorization2.middleware.AuthorizationMiddleware',
 'django.contrib.messages.middleware.MessageMiddleware',
 'django.middleware.clickjacking.XFrameOptionsMiddleware']
MIGRATION_MODULES: {
 }
MONTH_DAY_FORMAT: 'F j'
NUMBER_GROUPING: 0
PASSWORD_HASHERS: '********'
PASSWORD_RESET_TIMEOUT_DAYS: '********'
PREPEND_WWW: False
ROOT_URLCONF: 'integrator.settings.urls'
SECRET_KEY: '********'
SECURE_BROWSER_XSS_FILTER: True
SECURE_CONTENT_TYPE_NOSNIFF: True
SECURE_HSTS_INCLUDE_SUBDOMAINS: False
SECURE_HSTS_PRELOAD: False
SECURE_HSTS_SECONDS: 0
SECURE_PROXY_SSL_HEADER: None
SECURE_REDIRECT_EXEMPT: []
SECURE_SSL_HOST: None
SECURE_SSL_REDIRECT: False
SERVER_EMAIL: 'root@localhost'
SESSION_CACHE_ALIAS: 'default'
SESSION_COOKIE_AGE: 1209600
SESSION_COOKIE_DOMAIN: None
SESSION_COOKIE_HTTPONLY: True
SESSION_COOKIE_NAME: 'sessionid'
SESSION_COOKIE_PATH: '/'
SESSION_COOKIE_SAMESITE: 'Lax'
SESSION_COOKIE_SECURE: False
SESSION_ENGINE: 'django.contrib.sessions.backends.db'
SESSION_EXPIRE_AT_BROWSER_CLOSE: False
SESSION_FILE_PATH: None
SESSION_SAVE_EVERY_REQUEST: False
SESSION_SERIALIZER: 'django.contrib.sessions.serializers.JSONSerializer'
SETTINGS_MODULE: 'integrator.settings.prod'
SHORT_DATETIME_FORMAT: 'm/d/Y P'
SHORT_DATE_FORMAT: 'm/d/Y'
SIGNING_BACKEND: 'django.core.signing.TimestampSigner'
SILENCED_SYSTEM_CHECKS: []
STAGE: 'cicd'
STATICFILES_DIRS: ['/root/integrator/static']
STATICFILES_FINDERS: ['django.contrib.staticfiles.finders.FileSystemFinder',
 'django.contrib.staticfiles.finders.AppDirectoriesFinder']
STATICFILES_STORAGE: 'storages.backends.s3boto3.S3Boto3Storage'
STATIC_ROOT: None
STATIC_URL: '/static/'
TEMPLATES: []
TEST_NON_SERIALIZED_APPS: []
TEST_RUNNER: 'django.test.runner.DiscoverRunner'
THOUSAND_SEPARATOR: ','
TIME_FORMAT: 'P'
TIME_INPUT_FORMATS: ['%H:%M:%S', '%H:%M:%S.%f', '%H:%M']
TIME_ZONE: 'UTC'
USE_I18N: True
USE_L10N: True
USE_THOUSAND_SEPARATOR: False
USE_TZ: True
USE_X_FORWARDED_HOST: False
USE_X_FORWARDED_PORT: False
WSGI_APPLICATION: 'integrator.settings.wsgi.application'
X_FRAME_OPTIONS: 'SAMEORIGIN'
YEAR_MONTH_FORMAT: 'F Y'
is_overridden: <bound method Settings.is_overridden of <Settings "integrator.settings.prod">>
broker_url: 'sqs://AKIAREDACTED:********@localhost//'
broker_transport_options: {
 'queue_name_prefix': 'integrator-', 'region': 'us-west-2'}
worker_hijack_root_logger: False
auvipy commented 5 years ago

can you verify on top of master? and send a PR with proper tests?

rthille commented 5 years ago

I can try. Not sure the best way to mock out the connection and get it to fail at the right part of the loop. I tried testing with a real celery app and SQS and using tcpkill to kill the connection and it didn't result in the issue. If you've got any pointers to get started (other than the Contributing.rst doc I've found), that'd be helpful as this is my first time looking at the internals of Celery.

s-maj commented 5 years ago

I think the actual problem here is that http status codes 4XX and 5XX are unhandled in kombu. In this case 599 can be translated to malformed response or request timeout but it's quite common to get 500 or 503 from the AWS API or 403 in case of non existing resource or lack of proper IAM permissions.

I applied this patch, ran a load tests (easiest way to trigger 5XX responses from AWS API) but it didn't help much. Instead of of generic exception now I got ConnectionError but the final outcome is still Unrecoverable error.

EDIT: Well, setting broker_connections_retry to True (my bad) definitely helped for Unrecoverable error but now worker return amqp.exceptions.ConnectionError: Request Empty body HTTP 599 Unknown SSL protocol error in connection to eu-west-1.queue.amazonaws.com:443 (None) and hangs.

EDIT2: Alright, so all non 200 http responses lands here https://github.com/celery/kombu/blob/110dc10cbce82eb5f2402ae717d45b2a2100e634/kombu/asynchronous/aws/connection.py#L255 @auvipy could you please suggest how this execution should be handled?

physicalattraction commented 5 years ago

Any news on this issue?

wadewilliams commented 5 years ago

This seems to be fixed in 4.2. On Thu, Jan 3, 2019 at 3:47 AM Erwin Rossen notifications@github.com wrote:

Any news on this issue?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/celery/kombu/issues/931#issuecomment-451109523, or mute the thread https://github.com/notifications/unsubscribe-auth/ABmWWzg9J-e7uoRbU8HWFX69zGElqGrjks5u_d-3gaJpZM4XimJi .

physicalattraction commented 5 years ago

I am still facing the issue using kombu 4.2.0.

marlonchalegre commented 5 years ago

The error still there (4.2.0)

singhravi1 commented 5 years ago

Guys, any idea on stable release with this fix?

wadewilliams commented 5 years ago

4.2.1 "fixed" it in that now the worker dies when the connection is lost, so it may be restarted more easily.

singhravi1 commented 5 years ago

I'm using celery==4.3.0 and kombu==4.5.0 but it died with the following error and didn't restart on its own. [2019-09-07 17:11:25,492: CRITICAL/MainProcess] Unrecoverable error: Exception('Request HTTP Error HTTP 503 Service Unavailable (b\'<?xml version="1.0"?><ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/"><Error><Type>Receiver</Type><Code>ServiceUnavailable</Code><Detail/></Error><RequestId>1e3c6057-164e-5afd-9e5c-35baf0b31d67</RequestId></ErrorResponse>\')',)

It was raised from kombu, File "/home/....../lib/python3.6/site-packages/kombu/asynchronous/aws/connection.py", line 245, in _on_list_ready raise self._for_status(response, response.read())

auvipy commented 5 years ago

with latest kombu release?

singhravi1 commented 5 years ago

Currently I've kombu==4.5.0 installed. Should I try 4.6.4?

auvipy commented 5 years ago

please try and let us know

singhravi1 commented 5 years ago

Its been 6 days using version 4.6.4 and its still working. Will report if this happens again :+1:

matthewhegarty commented 4 years ago

For anyone hitting this with Django & Celery, make sure you define Kombu and Pycurl as dependencies. More info here