celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.82k stars 921 forks source link

`RuntimeError: Set changed size during iteration` with celery prefork #1774

Closed claudinoac closed 7 months ago

claudinoac commented 11 months ago

In celery and kombu v5.3.1, we are constantly being hit by the error below There are around 4 workers distributed across 3 different servers, listening to the same queues in prefork mode, with multiple processes (4 to 16 processes each). Most of the workers are daemonized using celeryd init-script. Some of them are manually spawned through CLI for debug monitoring. All of them are connected to the same redis broker Some workers simply throw the error below and stop working, randomly. We've been forced to restart those every few hours. This was happening before v5.3, but from https://github.com/celery/celery/issues/7162 we thought that issue would be fixed after that update, but that unfortunately didn't happen.

[2023-07-27 01:28:05,206: CRITICAL/MainProcess] Unrecoverable error: RuntimeError('Set changed size during iteration') Traceback (most recent call last): File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/worker/worker.py", line 202, in start self.blueprint.start(self) File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start step.start(parent) File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start return self.obj.start() File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 336, in start blueprint.start(self) File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start step.start(parent) File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 726, in start c.loop(*c.loop_args()) File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/worker/loops.py", line 97, in asynloop next(loop) File "/opt/app/venv/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 310, in create_loop for tick_callback in on_tick: RuntimeError: Set changed size during iteration

Mandatory Debugging Information

Optional Debugging Information

Related Issues and Possible Duplicates

Related Issues

Possible Duplicates

Environment & Settings

Celery version:

celery report Output:

``` software -> celery:5.3.1 (emerald-rush) kombu:5.3.1 py:3.8.6 billiard:4.1.0 redis:3.5.3 platform -> system:Linux arch:64bit, ELF kernel version:4.14.209-160.339.amzn2.x86_64 imp:CPython loader -> celery.loaders.app.AppLoader settings -> transport:redis results:redis://:**@**:6379/0 task_queues: ( -> celery>, -> queue_1.#>, -> queue_2.#>, -> queue_3.#>, -> queue_4.#>, -> queue_5>, -> queue_6.#>) -> queue_7.#>, task_routes: { ... undisclosed ... } beat_schedule: { ... undisclosed ... } timezone: 'UTC' result_backend: 'redis://:********@****:6379/0' broker_transport_options: 'visibility_timeout': 604800} broker_url: 'redis://:********@****:6379/0' redis_retry_on_timeout: True redis_socket_keepalive: True redis_backend_health_check_interval: 5 redis_password: '********' broker_connection_max_retries: None broker_pool_limit: None accept_content: ['json'] task_always_eager: False task_ignore_result: True task_soft_time_limit: 36000 worker_pool_restarts: True worker_prefetch_multiplier: 1 worker_max_memory_per_child: 500000 worker_proc_alive_timeout: 60 worker_send_task_events: True task_send_sent_event: True ```

Steps to Reproduce

Required Dependencies

Python Packages

pip freeze Output:

``` absl-py==1.4.0 amqp==5.1.1 asgiref==3.4.1 astunparse==1.6.3 backcall==0.2.0 backports.zoneinfo==0.2.1 beautifulsoup4==4.9.3 billiard==4.1.0 blis==0.7.9 botocore==1.30.0 cachetools==4.2.2 catalogue==2.0.6 celery==5.3.1 certifi==2021.10.8 charset-normalizer==2.0.7 clarifai==2.6.2 click==8.1.3 click-didyoumean==0.3.0 click-plugins==1.1.1 click-repl==0.2.0 colorama==0.4.3 colour==0.1.5 confection==0.0.3 configparser==3.8.1 corextopic==1.1 cx-Oracle==8.2.1 cycler==0.11.0 cymem==2.0.5 decorator==5.0.9 dill==0.3.4 distro==1.6.0 Django==3.2.19 docutils==0.15.2 elasticsearch==7.16.2 et-xmlfile==1.1.0 flashtext==2.7 flatbuffers==23.3.3 flower==1.2.0 ftfy==6.0.3 future==0.18.2 gast==0.3.3 grpcio==1.32.0 h5py==2.10.0 hickle==4.0.4 httplib2==0.19.1 humanize==3.13.1 idna==3.3 ijson==3.1.4 inflection==0.5.1 jedi==0.18.0 Jinja2==3.0.1 jmespath==0.10.0 joblib==1.0.1 jsonschema==2.6.0 kiwisolver==1.3.2 kombu==5.3.1 langcodes==3.3.0 libclang==16.0.0 lxml==4.6.3 Markdown==3.3.4 MarkupSafe==2.0.1 mongoengine==0.23.1 mysqlclient==2.0.3 networkx==2.6.3 numpy==1.22.2 opt-einsum==3.3.0 packaging==21.0 pandas==1.3.2 parso==0.8.2 pathy==0.6.0 pendulum==2.1.2 pexpect==4.8.0 Pillow==9.4.0 preshed==3.0.5 prompt-toolkit==3.0.20 protobuf==3.17.3 psycopg2-binary==2.9.1 ptyprocess==0.7.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 pycurl==7.44.1 pydantic==1.8.2 Pygments==2.10.0 pylibmc==1.6.3 pymemcache==3.5.0 pymongo==3.12.0 pyparsing==2.4.7 python-dateutil==2.8.2 python-Levenshtein==0.12.2 pytz==2022.5 pytzdata==2020.1 PyYAML==5.4.1 redis==3.5.3 regex==2022.10.31 requests==2.26.0 rsa==4.7.2 scipy==1.7.1 setuptools-rust==0.12.1 simplejson==3.17.5 six==1.15.0 smart-open==5.2.1 soupsieve==2.2.1 sqlparse==0.4.1 srsly==2.4.5 termcolor==1.1.0 thinc==8.1.5 threadpoolctl==2.2.0 ThreeScalePY==2.6.0 tokenizers==0.12.1 toml==0.10.2 tornado==6.1 tqdm==4.62.2 traitlets==5.1.0 treelib==1.6.1 typer==0.4.2 typing_extensions==4.1.1 tzdata==2023.3 uritemplate==3.0.1 urllib3==1.26.7 vine==5.0.0 wasabi==0.10.1 wcwidth==0.2.5 websocket-client==0.48.0 wrapt==1.12.1 zope.event==4.6 zope.interface==5.5.2 ``` ### Other Dependencies - ## Minimally Reproducible Test Case It's not that easily reproducible as it happens after running and processing a bunch of tasks for a few hours # Expected Behavior Keep running the worker non-stop # Actual Behavior At some point, the worker throws the error and gracefully stop all the processes, as it claims to be an `Unrecoverable Error`

jaroslawporada commented 10 months ago

Hi, I have same issue and I can reproduce it by having following preconditions:

image

    Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 332, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 628, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.11/site-packages/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/usr/local/lib/python3.11/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1326, in on_readable
    self.cycle.on_readable(fileno)
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 562, in on_readable
    chan.handlers[type]()
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 955, in _brpop_read
    dest__item = self.client.parse_response(self.client.connection,
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/client.py", line 1286, in parse_response
    response = connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/connection.py", line 882, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/connection.py", line 349, in read_response
    result = self._read_response(disable_decoding=disable_decoding)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/connection.py", line 359, in _read_response
    raw = self._buffer.readline()
          ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/connection.py", line 262, in readline
    self._read_from_socket()
  File "/usr/local/lib/python3.11/site-packages/redis/connection.py", line 215, in _read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.

Library version for old/new errors: Zrzut ekranu z 2023-09-14 15-07-24

auvipy commented 7 months ago

would be happy to get a PR which fix the issue

claudinoac commented 7 months ago

@auvipy I'll give it a try :)

claudinoac commented 7 months ago

On that exact point where we observe

File "/usr/local/lib/python3.11/site-packages/kombu/asynchronous/hub.py", line 310, in create_loop
   for tick_callback in on_tick:
   RuntimeError: Set changed size during iteration

I've inspected the content of on_tick and here's what I've found. on_tick has changed from

{
    <function Transport.register_with_event_loop.<locals>.on_poll_start at 0x7f01bbb71700>,
    <function AsynPool._create_write_handlers.<locals>.on_poll_start at 0x7f020d9ea430>
}

to

 {<function AsynPool._create_write_handlers.<locals>.on_poll_start at 0x7f020d9ea430>}

When I've done a shallow copy of on_tick right before iterating, the server kept running, but at given point it simply stopped consuming the queues. Transport.register_with_event_loop.<locals>.on_poll_start maps back to the kombu/transport/redis.py module.

I also tried to force redis to timeout while running cpu-heavy operations locally, but the worker returns the right error ConnectionError and restarts itself. This specific issue keeps happening only in production.

claudinoac commented 7 months ago

Hey eveyone, quick update. After a while going over this, I've been finally able to reproduce the issue

Steps:

Now that I can reliably reproduce it, I'll dig into this.