mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

What is the precise definition of the term "processed"? #605

Closed RonaldGalea closed 6 months ago

RonaldGalea commented 6 months ago

The message process context manager logic revolves around the ignore_processed flag, which of course depends on the processed attribute of a message.

What does this mean exactly? I can't really find the term in RabbitMQ docs, so I guess it somehow is specific to the implementation of this library. Semantically, what does it mean for a message to be processed? When does it become processed?

I have a small follow-up question on this as well. My goal is to process messages transactionally, if any exception occurs, roll everything back and just requeue the message. Does the following suffice for this?

async with queue.iterator() as queue_iterator:
    async for message in queue_iterator:
        async with channel.transaction():
            # do whatever work with the message
            await message.ack()

The process context manager should not even be needed here, right?

Thank you

mosquito commented 6 months ago

The process context manager should not even be needed here, right?

Short answer: If you don't want to handle exceptions in the do whatever work with the message block then yes. But it's a worst idea IMHO.

Following example is completely what the process does:

async with queue.iterator() as queue_iterator:
    async for message in queue_iterator:
        async with channel.transaction():
            try:
                 # do whatever work with the message
            except Exception:
                 await message.reject()
            else:
                 await message.ack()
RonaldGalea commented 6 months ago

Thanks a lot for the reply.

So essentially the broker should always be sent a response, either ack or reject. Otherwise, if there is an unhandled error and the worker does not reply anything, the broker will just consider this message as being processed until some timeout is eventually hit.