taskiq-python / taskiq

Distributed task queue with full async support
MIT License
866 stars 52 forks source link

Do not ACK errored messages #371

Closed danngalann closed 2 weeks ago

danngalann commented 1 month ago

I had an issue with my FastAPI project where a task was throwing an exception and yet the message was being acknowledged and lost forever. I tried using the context inside the task to reject the message as shown here, but the task seemed to hang while trying to resolve the context dependency. I ended up implementing a fix I'll describe below.

For reference, here's a simplified version of my code:

# broker.py
import taskiq_fastapi
from taskiq_aio_pika import AioPikaBroker

from settings import settings

broker = AioPikaBroker(
    settings.rabbit_dsn,
    queue_name=f"myapp_{settings.environment}",
    exchange_name=f"myapp_{settings.environment}",
    dead_letter_queue_name=f"myapp_{settings.environment}.dead_letter",
    delay_queue_name=f"myapp_{settings.environment}.delay",
    qos=1,  # Consume one message at a time
)
# myservice.py
@broker.task
async def store(
    task_data,
) -> None:
    service = SomeService()
    data = await service.fetch_data_rpc(
        task_data
    )

Here, the fetch_data_rpc inside my task throws an exception. I'd expect the message to be requeued, sent to the dead letter configured in the broker, or at least not acknowledged, but that is not the case.

I'm not sure if this is an issue or an intended behavior, as I can see that in your receiver.py all messages are acknowledged regardless of their errors.

if self.ack_time == AcknowledgeType.WHEN_RECEIVED and isinstance(
    message,
    AckableMessage,
):
    await maybe_awaitable(message.ack())

This may be related with this issue,

In the end, my solution was to wrap the ACK in an if that checks if the message had errors. Now, if a task fails, the message is not acknowledged. This solves my issue, as failed messages are not lost now.

I'm sending my fix as a PR in case you consider it useful for use cases other than mine.

Sobes76rus commented 1 month ago

Yes, it is intended behavior. If you wish to retry on error, u could implement RetryMiddleware for AioPikaBroker or define your own ack method for message It is problem with AioPikaBroker and not Receiver

s3rius commented 2 weeks ago

I agree with @Sobes76rus. The thing is that ack mechanism is intended to check on connection issues. For example if during reading message worker went down and couldn't process the message it would be given to another worker. But if message was received then it's up to the application to handle exceptions.