mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Ack and reject outside of IncomingMessage.process() #606

Closed kimdre closed 6 months ago

kimdre commented 6 months ago

Hi,

Can you ack or reject messages outside of the context manager for processing the message? Since I collect messages before writing them as bulk to a database in intervals, my context manager only contains a list append for these messages and I do the actual processing in a different loop/context where I would also like to send the ack if the processing there was actualy Ok.

The documentation could not help me unfortuantely.

My process context manager looks like this.

async def on_message(self, message: AbstractIncomingMessage):
        """
        Callback function for when a message is received

        :param message: Message received
        :return: None
        """
        async with message.process(ignore_processed=True):
            logger.debug(f"Received message: {message.body}")
            try:
                self.bulk_messages.append(message)

            except Exception as e:
                logger.error(f"Failed to process message: {e}")
                self.failed_messages.append(message)

And the function that does the actual processing and sends the ack if everything was fine:

async def write_bulk_to_database(self):
        """
        Write messages to database

        :return: None
        """
        if len(self.bulk_messages) >= settings.app_min_queue_size:
            message_list_chunks = list(self._divide_chunks(self.bulk_messages, settings.app_db_max_batch_size))

            for message_list_chunk in message_list_chunks:
                messages = []

                message: AbstractIncomingMessage
                for message in message_list_chunk:
                    try:
                        event = await self.event_types.convert(
                            json.loads(message.body.decode())
                            )
                        messages.append(event.model_dump())
                    except Exception as e:
                        logger.exception(e)
                        self.failed_messages.append(message)
                if not await save_events(messages):
                    self.failed_messages.append(messages)
                else:
                    for message in message_list_chunk:
                        await message.ack()
                    self.bulk_messages = []

It seems like the message.ack() does not work here, as I get a MessageProcessError that says "'Message already processed'".

kimdre commented 6 months ago

Ok was I was able to fix it myself. I didn't need the line below, since with this line all messages get ack'ed and rejected based on their return value/exception in this context manager, which in my case sent an ack whenever I append the message to the list.

async with message.process(ignore_processed=True):