qweeze / rstream

A Python asyncio-based client for RabbitMQ Streams
MIT License
72 stars 12 forks source link

Preventing Excessive Callback Invocations After Consumer Closure in `consumer.subscribe` #176

Closed GrzegorzPustulka closed 1 month ago

GrzegorzPustulka commented 8 months ago

I encountered a problem while setting store_offset in the _callback method. When the condition if self.received_messages >= self.top is met, I set store_offset and execute self.consumer.close(). This indeed closes the consumer and returns to the consume_with_store_offset function, returning the string "example". However, despite closing the consumer and returning this string, the _callback method is still being invoked up to offset 31 (if I send enough requests to reach this offset, it then continues calling _callback up to offset 95). As a temporary workaround, I added a condition to avoid unnecessarily calling all operations. However, this is not an optimal solution. Is there any way to avoid these unnecessary _callback function calls? I tried setting the initial_credit parameter to self.top to move the offset by the desired number, but this does not help.

code snippet:

    async def consume_with_store_offset(self, top: int) -> str:
        self.top = top
        await self.consumer.start()

        try:
            my_offset = await self.consumer.query_offset(stream=self.stream, subscriber_name="subscriber")
        except OffsetNotFound:
            my_offset = 0

        await self.consumer.subscribe(
            stream=self.stream,
            subscriber_name="subscriber",
            callback=self._callback,
            offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, my_offset),
        )
        await self.consumer.run()
        return "example"

    async def _callback(self, msg: AMQPMessage, message_context: MessageContext):
        if not self.is_limit_reached:
            offset = message_context.offset
            subscriber_name = message_context.subscriber_name
            self.received_messages += 1
            log.debug(f"Received message: {msg} from stream: {self.stream} at offset: {offset}.")
            if self.received_messages >= self.top:
                self.is_limit_reached = True
                await self.consumer.store_offset(self.stream, subscriber_name, offset)
                await self.consumer.close()
DanielePalaia commented 8 months ago

HI @GrzegorzPustulka thank you to have opened this issue. It is important to receive feedbacks about this project on some real use case scenarios to improve it. Feel free to open new ones if you meet further issues.

Unfortunately at the moment we are spending a lot of efforts to review/reimplement the MetdataUpdate/Disconnections scenarios and we don't have more bandwith to work on this project.

We will have a look as soon as we can, in the meantime if you feel you can attempt a PR is very welcome!

DanielePalaia commented 5 months ago

HI @GrzegorzPustulka I had a look to this issue.

Unfortunately I don't think there is much we can really do in the library.

The messages consumed that you see after you close the consumer are the ones remaining in the consumed chunk.

This can also be seen using the simple example in the library.

A separate thread in the library is consuming all the messages in the chunk and invoking the callback for every message. The time you close the consumer the callback is already been invoked several times by the other consumed messages on the chunk.

You can see it at line 371 on the consumer (the _ondeliver method) which is looping over the messages of the chunk. The _on_deliver is registered as handle in line 249.

DanielePalaia commented 1 month ago

i will close this one as we didn't receive any feedback from the community and it is not planned to be implemented