celery / celery

Distributed Task Queue (development branch)
https://docs.celeryq.dev
Other
24.72k stars 4.66k forks source link

Broker heartbeats not sent during graceful shutdown #5998

Open hmatland opened 4 years ago

hmatland commented 4 years ago

Checklist

Mandatory Debugging Information

Optional Debugging Information

Related Issues and Possible Duplicates

Related Issues

Possible Duplicates

Environment & Settings

4.4.1 (cliffs):

celery report Output:

``` software -> celery:4.4.1 (cliffs) kombu:4.6.8 py:3.7.3 billiard:3.6.3.0 py-amqp:2.5.2 platform -> system:Darwin arch:64bit kernel version:19.2.0 imp:CPython loader -> celery.loaders.app.AppLoader settings -> transport:pyamqp results:redis://localhost/ broker_url: 'amqp://rabbitmq:********@localhost:5672//' result_backend: 'redis://localhost/' task_soft_time_limit: 3600 task_time_limit: 4000 task_acks_late: True task_acks_on_failure_or_timeout: True worker_max_tasks_per_child: 1 worker_concurrency: 1 worker_prefetch_multiplier: 1 broker_heartbeat: 10 task_default_queue: 'heartbeat-testing' ```

Steps to Reproduce

Connection will be dropped from RabbitMQ:

2020-03-11 08:30:10.554 [error] <0.3468.0> closing AMQP connection <0.3468.0> (172.18.0.1:46900 -> 172.18.0.2:5672):
missed heartbeats from client, timeout: 10s

Message is completed, but not acked. Message is redelivered to another worker, or when worker restarts.

Heartbeats are not sent during graceful shutdown, causing message redelivery.

Required Dependencies

Note that above is notn "Minimal version", but merely what I created this report with. I have not taken steps to test with a wide variety of dependency combinations.

Python Packages

pip freeze Output:

``` amqp==2.5.2 billiard==3.6.3.0 celery==4.4.1 importlib-metadata==1.5.0 kombu==4.6.8 pytz==2019.3 redis==3.4.1 vine==1.3.0 zipp==3.1.0 ```

Other Dependencies

N/A

Minimally Reproducible Test Case

```python from time import sleep from celery import Celery app = Celery('tasks', broker='pyamqp://rabbitmq:rabbitmq@localhost//') app.conf.update( result_backend='redis://localhost', task_soft_time_limit=3600, task_time_limit=4000, task_acks_late=True, task_acks_on_failure_or_timeout=True, worker_max_tasks_per_child=1, worker_concurrency=1, worker_prefetch_multiplier=1, broker_heartbeat=10, task_default_queue="heartbeat-testing" ) @app.task(name='heartbeat-test-task') def add(x, y): sleep(30) return x+y ``` Trigger the task, and send a `SIGTERM` to the worker process. The task will complete successfully, but not be acked, and thus redelivered.

Expected Behavior

I expect the message to be handled as expected during graceful shutdown. The worker should continue with heartbeats until it actually shuts down. The message should be acked, and not redelivered.

Actual Behavior

The message is completed without the worker logs stating anything is wrong. The message is redlivered due to the connection being closed by the broker.

The issue hitted us quite hard due to the combination:

For now we run broker_heartbeat=0 to allow us to complete tasks during graceful shutdown. Ideally we'd like to have heartbeats enabled.

zerog2k commented 4 years ago

confirm that this is biting us in celery 4.3.0, and unable to use graceful shutdown (stopwait) effectively on scale-in on our long running tasks (several minutes)

we are able to use a larger heartbeat on rabbitmq, but not ideal: https://www.rabbitmq.com/heartbeats.html (must also pass heartbeat timeout increase in celery config, as client-side default is only 2m)


here is what we see on the celery side:

task is picked up by celery:

[2020-03-27 22:00:40,563.563] DEBUG [8901 139780993414976] [celery.worker.request:301] Task accepted: yahoo.contrib.yxs2.tasks.job_queue_decider[3212ba65-eaec-4e4b-96f1-b8317bb098f7] pid:8928

then issue celery stopwait

[2020-03-27 22:00:45,975.975] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Worker: Closing Hub...
[2020-03-27 22:00:45,975.975] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Worker: Closing Pool...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Worker: Closing Consumer...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Worker: Stopping Consumer...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Closing Connection...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Closing Events...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Closing Heart...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Closing Mingle...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Closing Gossip...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Closing Tasks...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Closing Control...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Closing event loop...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Stopping event loop...
[2020-03-27 22:00:45,976.976] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Stopping Control...
[2020-03-27 22:00:45,980.980] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Stopping Tasks...
[2020-03-27 22:00:45,981.981] DEBUG [8901 139780993414976] [celery.worker.consumer.tasks:54] Canceling task consumer...
[2020-03-27 22:00:45,983.983] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Stopping Gossip...
[2020-03-27 22:00:45,987.987] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Stopping Mingle...
[2020-03-27 22:00:45,987.987] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Stopping Heart...
[2020-03-27 22:00:45,987.987] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Stopping Events...
[2020-03-27 22:00:45,987.987] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Stopping Connection...
[2020-03-27 22:00:45,987.987] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Worker: Stopping Pool...

stop wait blocks for a few minutes while my task completes, then

[2020-03-27 22:08:21,036.36] INFO [8928 139780993414976] [celery.app.trace:124] Task yahoo.contrib.yxs2.tasks.job_queue_decider[3212ba65-eaec-4e4b-96f1-b8317bb098f7] succeeded in 460.4728824859776s: None
[2020-03-27 22:08:22,142.142] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Worker: Stopping Hub...
[2020-03-27 22:08:22,142.142] CRITICAL [8901 139780993414976] [celery.worker.request:134] Couldn't ack 3, reason:ConnectionResetError(104, 'Connection reset by peer')
Traceback (most recent call last):
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 306, in create_loop
    events = poll(poll_timeout)
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/kombu/utils/eventio.py", line 84, in poll
    return self._epoll.poll(timeout if timeout is not None else -1)
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/celery/apps/worker.py", line 284, in _handle_request
    raise exc(exitcode)
celery.exceptions.WorkerShutdown: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/kombu/message.py", line 131, in ack_log_error
    self.ack(multiple=multiple)
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/kombu/message.py", line 126, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/amqp/channel.py", line 1394, in basic_ack
    spec.Basic.Ack, argsig, (delivery_tag, multiple),
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/amqp/abstract_channel.py", line 59, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/amqp/method_framing.py", line 172, in write_frame
    write(view[:offset])
  File "/home/ec2-user/yxs2/lib/python3.6/site-packages/amqp/transport.py", line 284, in write
    self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2020-03-27 22:08:22,144.144] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Shutdown Control...
[2020-03-27 22:08:22,144.144] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Shutdown Tasks...
[2020-03-27 22:08:22,144.144] DEBUG [8901 139780993414976] [celery.worker.consumer.tasks:54] Canceling task consumer...
[2020-03-27 22:08:22,144.144] DEBUG [8901 139780993414976] [celery.worker.consumer.tasks:61] Closing consumer channel...
[2020-03-27 22:08:22,144.144] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Shutdown Gossip...
[2020-03-27 22:08:22,144.144] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Shutdown Heart...
[2020-03-27 22:08:22,144.144] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Shutdown Events...
[2020-03-27 22:08:22,144.144] DEBUG [8901 139780993414976] [celery.bootsteps:262] | Consumer: Shutdown Connection...
[2020-03-27 22:08:22,145.145] DEBUG [8901 139780993414976] [celery.concurrency.asynpool:1278] removing tasks from inqueue until task handler finished
zerog2k commented 4 years ago

Also, can someone from celery team verify if we can set broker_heartbeat via env var, i.e. CELERY_BROKER_HEARTBEAT? My limited testing (4.3.0) appears that this is not respected, and I must set broker_heartbeat in my celery settings py. (Is this a separate bug? Can someone point me to how/where env vars are supposed to be parsed by celery? I found this, https://github.com/celery/celery/blob/master/celery/app/utils.py#L90-L118 but I'm not convinced this is the correct or complete view of this mechanism.)

auvipy commented 4 years ago

can you try celery master? would be happy to discuss in a PR

hmatland commented 4 years ago

I used these packages:

[packages]
celery = {file = "https://github.com/celery/celery/zipball/master"}
billiard = {file = "https://github.com/celery/billiard/zipball/master"}
amqp = {file = "https://github.com/celery/py-amqp/zipball/master"}
kombu = {file = "https://github.com/celery/kombu/zipball/master"}
vine = {file = "https://github.com/celery/vine/zipball/master"}

Please correct me if this is not enough to try master.

The Minimally Reproducible Test Case is still valid. Tasks will complete, but connection rabbitmq connection dropped during shutdown, and task redelivered when starting the worker again.

[2020-05-15 09:06:48,758: INFO/MainProcess] Received task: heartbeat-test-task[3cf9e3b8-e138-4261-bb39-92a3b3f1e904]  
^C
worker: Hitting Ctrl+C again will terminate all running tasks!

worker: Warm shutdown (MainProcess)
[2020-05-15 09:07:18,771: INFO/ForkPoolWorker-1] Task heartbeat-test-task[3cf9e3b8-e138-4261-bb39-92a3b3f1e904] succeeded in 30.010418991999998s: 8

During this I see:

2020-05-15 07:07:16.048 [error] <0.11333.5> closing AMQP connection <0.11333.5> (172.18.0.1:59872 -> 172.18.0.3:5672):
missed heartbeats from client, timeout: 10s

From the rabbitmq logs.

On starting the worker again, the same task is delivered again:

[2020-05-15 09:08:19,100: INFO/MainProcess] Received task: heartbeat-test-task[3cf9e3b8-e138-4261-bb39-92a3b3f1e904]  
[2020-05-15 09:08:49,217: INFO/ForkPoolWorker-1] Task heartbeat-test-task[3cf9e3b8-e138-4261-bb39-92a3b3f1e904] succeeded in 30.012629242s: 

I would not know how to fix this, or where to even begin sadly.

As originally stated, we now run without heartbeats, and have instead done some custom work to close "dangling" through rabbitmq management tools.

auvipy commented 4 years ago

thanks for update

crook commented 3 years ago

@hmatland we team hit same issue, just disable the heatbeart can workaround this issue. pls try.

broker_heartbeat = 0

https://www.rabbitmq.com/heartbeats.html

auvipy commented 2 years ago

@hmatland we team hit same issue, just disable the heatbeart can workaround this issue. pls try.

broker_heartbeat = 0

https://www.rabbitmq.com/heartbeats.html

can we close the issue based on that? do you have any suggested changes in celery itself?

hmatland commented 2 years ago

@hmatland we team hit same issue, just disable the heatbeart can workaround this issue. pls try.

broker_heartbeat = 0

https://www.rabbitmq.com/heartbeats.html

can we close the issue based on that? do you have any suggested changes in celery itself?

I mentioned this workaround in my original issue. Disabling heartbeat potentially leaves stale connections in Rabbitmq, and is not a great option. Messages on stuck connections do not get redelivered until the connection is killed.

We had a separate stale connection cleaner running due to this.

Now we have moved the workload to something else than Celery. So I do not know if the original issue is still an issue on latest release or not.

freedrikp commented 2 years ago

I can reproduce this with:

software -> celery:5.1.2 (sun-harmonics) kombu:5.1.0 py:3.8.12
            billiard:3.6.4.0 py-amqp:5.0.6
platform -> system:Linux arch:64bit, ELF
            kernel version:5.10.76-linuxkit imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:amqp results:disabled

deprecated_settings: None
broker_url: 'amqp://user:********@localhost:5672/vhost'
task_acks_late: True

and

software -> celery:5.2.3 (dawn-chorus) kombu:5.2.3 py:3.10.1
            billiard:3.6.4.0 py-amqp:5.0.9
platform -> system:Linux arch:64bit, ELF
            kernel version:5.10.76-linuxkit imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:amqp results:disabled

deprecated_settings: None
broker_url: 'amqp://user:********@localhost:5672/vhost'
task_acks_late: True

We are also forced to disable heartbeats due to this which is not ideal.

lefterisnik commented 1 year ago

I can verify that this is still an issue on celery 5.2.x

Laza034 commented 10 months ago

Hey, any movement on this issue? We have 200k task impacted by this and we can not have disabled heartbeats for rabbitMQ

tdzienniak commented 6 months ago

This is really bad. Are there any plans to work on this issue?