Upon connection error the ongoing tasks are not being marked as processed #8541

acangiani opened 1 year ago

acangiani commented 1 year ago


These two seem somewhat related to the issue we are facing.

Celery version: 5.3.4

celery report Output:

Steps to Reproduce

pip freeze Output:
amqp==5.1.1
asgiref==3.7.2
billiard==4.1.0
celery==5.3.4
click==8.1.7
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.3.0
Django==4.2.5
flower==2.0.1
humanize==4.8.0
kombu==5.3.2
prometheus-client==0.17.1
prompt-toolkit==3.0.39
python-dateutil==2.8.2
pytz==2023.3.post1
six==1.16.0
sqlparse==0.4.4
tornado==6.3.3
typing_extensions==4.8.0
tzdata==2023.3
vine==5.0.0
wcwidth==0.2.6

Minimally Reproducible Test Case

1. Clone PoC repo.
3. Start all of the components with `docker compose up`.
4. Enqueue several instances of `long_running_task` by doing:
```python
from sample.celery import long_running_task
long_running_task.delay()
```
5. Restart `RabbitMQ` while one of the tasks is being processed with `docker restart <container_id>`.

Alternatively:
1. Create a long-running task:
```python
import time

@app.task()
def long_running_task():
    print("long running task: start")
    time.sleep(30)
    print("long running task: end")
    return "long running task: done"
```
2. Run several instances of `long_running_task`.
4. Restart RabbitMQ while of of those tasks is being processed.

Expected Behavior

  1. The long-running task should be successfully processed. Because we are not using CELERY_TASK_ACKS_LATE=True the pool worker should not be restarted or stopped.
  2. The worker should respect the max concurrency setting.
  3. The worker should consider tasks that started before the connection restart but were completed after as done and not as active.
  4. The worker main process should emit the task-succeeded event.

Actual Behavior

  1. The long-running task is successfully processed.
  2. The worker is not respecting the max concurrency setting. Informs 3 active tasks but concurrency is 2.
  3. The tasks that started before the connection restart but were completed after are considered as active.
  4. The worker main process is not emitting the task-succeeded event.

Example of the actual behaviour with a documented case:

  1. Settings of worker celery@ccd33a7f7638 (concurrency=2):

  2. Ghost task long_running_task is considered as active with UUID 2cbdb8e8-aa29-45a1-aee1-e9afa244ea10 after the RabbitMQ restarted and the logs show the tasks succeded:

  6. Summarized logs of task with UUID `2cbdb8e8-aa29-45a1-aee1-e9afa244ea10` that goes from `received` to `succeeded` with a `RabbitMQ` connection restart in between

sample-default-worker-1 | [2023-09-27 21:22:29,251: INFO/MainProcess] Task sample.celery.long_running_task[2cbdb8e8-aa29-45a1-aee1-e9afa244ea10] received
...
sample-default-worker-1 | [2023-09-27 21:23:12,642: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:@rabbitmq:5672//: [Errno 111] Connection refused.
...
Trying again in 4.00 seconds... (2/100)
...
sample-default-worker-1 | [2023-09-27 21:23:22,689: INFO/MainProcess] Connected to amqp://guest:@rabbitmq:5672//
...
sample-default-worker-1 | [2023-09-27 21:23:28,310: INFO/ForkPoolWorker-2] Task sample.celery.long_running_task[2cbdb8e8-aa29-45a1-aee1-e9afa244ea10] succeeded in 30.0159327230067s: 'long running task: done'

  7. Full logs of task with UUID `2cbdb8e8-aa29-45a1-aee1-e9afa244ea10` that goes from `received` to `succeeded` with a `RabbitMQ` connection restart in between

Possible solution

While debugging we found a way to fix this but we are not sure it's the right way or if there are unintended consequences.

  1. Do not clear the _cache
# celery/concurrency/asynpool.py:L1020
    def flush(self):
        if self._state == TERMINATE:
        # cancel all tasks that haven't been accepted so that NACK is sent
# if synack is enabled.
        if self.synack:
        if self.synack:
            if not job._accepted:
                job.cancel()

        # clear the outgoing buffer as the tasks will be redelivered by
# the broker anyway.
        if self.outbound_buffer:
        self.outbound_buffer.clear()


            # ...but we must continue writing the payloads we already started
            if self._state == RUN:
            # flush outgoing buffers
            if self._state == RUN:
                for interval in intervals:
                if not self._active_writers:

                # TODO: Rewrite this as a dictionary comprehension once we drop support for Python 3.7
                owned_by = {}
                owned_by = {}
                writer = _get_job_writer(job)
                    if writer is not None:
                    owned_by[writer] = job
                        if not self._active_writers:

                # self._cache.clear()
                    break

Or directly avoid flushing the pool when the consumer disconnects:

# colery/worker/consumer/consumer.py:L445
    def on_close(self):
        # Clear internal queues to get rid of old messages.
        if self.controller and self.controller.semaphore:
        self.controller.semaphore.clear()
        if self.timer:
        self.timer.clear()
        for bucket in self.task_buckets.values():
            if bucket:
        bucket.clear_pending()
            for request_id in reserved_requests:
                if request_id in requests:
        del requests[request_id]
        #     self.pool.flush()

By doing this the tasks are correctly computed as processed and no longer active because the _cache won't raise a KeyError when the job is done.

One of the things that puzzles me, is if we are not using ack_late then why do we need to flush the worker pool? My feeling is that they should be as independent as possible.

  2. Regarding the succeeded event we found this workaround:
# celery/events/dispatcher.py:L215
def extend_buffer(self, other):
        """Copy the outbound buffer of another instance."""
        self._group_buffer = other._group_buffer  # Keep the other/prev messages buffer

When the EventDispatcher gets flushed we need to keep the pointer to the previous _group_buffer so the ongoing jobs before the connection restart will have the correct pointer.

We are hesitant if 1 and 2 are real solutions or if we are not considering other use cases or externalities. Any help and feedback are more than welcomed!

acangiani commented 1 year ago

@thedrow Upon further investigation I think it is related to your MR https://github.com/celery/celery/pull/6863, if possible, can you take a look into this issue?

auvipy commented 1 year ago

Hey, Agustin! can you come with a PR with extensive test case and prospective fix? as the old PR do not have any unit test so it is difficult to verify that. And thanks for your investigation so far. We can discuss further on the draft/in progress PR