Open YahelOpp opened 9 months ago
The patch I've done that makes it work as expected is:
# Patch a fix for SQS channel
original_put = Transport.Channel._put
def patched_put(self, queue, message, **kwargs):
original_put(self, queue, message, **kwargs)
if message.get("redelivered"):
q_url = self._new_queue(queue)
c = self.sqs(queue=self.canonical_queue_name(queue))
c.change_message_visibility(
QueueUrl=q_url,
ReceiptHandle=message['properties']['delivery_tag'],
VisibilityTimeout=0
)
Transport.Channel._put = patched_put
I think this is a similar issue to what we're seeing, but we see that during a cold shutdown. Our scenario is:
Restoring X unacknowledged message(s)
warning in the logRelevant settings:
task_reject_on_worker_lost: True
task_acks_late: True
ReceiveMessageWaitTimeSeconds on the queue: 20 seconds
From my investigation it seems to be caused by a race condition that happens because of the long polling and/or multiple workers (concurrency > 1
).
In CloudTrail I can see that the ChangeMessageVisibility
call happens correctly, but there's also another ReceiveMessage
call up to 3 seconds later from the same host, which puts the message back into invisible
state.
Work-arounds I've found:
concurrency
to 1
prevents this from happening - not really a solution for production environments.I think this could be easily prevented by adjusting the value in the ChangeVisibilityTimeout
call to change it to set VisibilityTimeout
to whatever the polling interval is.
That would hide the message from any existing long polling requests (so also the other workers that are currently being shut down) long enough for the shutdown to complete. Even if the polling interval is set to maximum 20 seconds, that's usually still much better than whatever VisibilityTimeout is set to (which is 30 minutes in our case because of long-running tasks!).
Obviously, a better solution would be to restore the messages after all workers under the master process finish/stop their ReceiveMessage
requests - but I'm not sure if that's possible with boto?
Possibly related to: https://github.com/celery/celery/discussions/8583
software -> celery:5.3.6 (emerald-rush) kombu:5.3.7 py:3.9.19
billiard:4.2.0 sqs:N/A
platform -> system:Linux arch:64bit, ELF
kernel version:5.10.218-208.862.amzn2.x86_64 imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:sqs results:disabled
Potentially fixed with https://github.com/celery/celery/pull/9213 (although it’s in Celery, not Kombu) @YahelOpp
Potentially fixed with celery/celery#9213 (although it’s in Celery, not Kombu) @YahelOpp
Celery v5.5.0b3 released.
Versions:
After seeing messages stuck in
NotVisible
state at our production queues, I've set out to verify the warm shutdown mechanism works properly. My test setup:What I see happens:
received
)NotVisible
stateWarm shutdown
NotVisible
stateExpected behavior: the message should move to
Visible
state.A possible explanation I've found: By manually changing the code in
kombu/transport/SQS.py
at the_put
function in the casemessage.get('redelivered')
, adding a duplicate call tochange_message_visibility
- the problem seemed to go away. I've tested this against real live SQS and against Localstack. In both the problem was consistent and solved once I added the duplicate call.To sum up, the problem might be on boto3's side and not in kombu, but I'm unsure about this. I will be happy to assist and make a PR / provide a simple testing scenario.
Thanks