qweeze / rstream

A Python asyncio-based client for RabbitMQ Streams
MIT License
84 stars 13 forks source link

Request for Callback Disconnection Feature After Receiving the Last Message #177

Closed GrzegorzPustulka closed 9 months ago

GrzegorzPustulka commented 9 months ago

Is there any possibility or are there plans to add a functionality in the future to disconnect the callback function for consumer.subscribe when the last message in the stream is reached, instead of waiting for new ones? Or receive the number of messages in a stream or to get the 'index' of the last message.

GrzegorzPustulka commented 9 months ago

The last index can be achieved by invoking the 'subscribe' method with the offset set to 'Last' and disconnecting the client through asyncio.create_task(consumer.close()). (The ctx.offset needs to be assigned to a variable we define). This approach works because, despite disconnecting the consumer at the first callback, messages are fetched in batches, so when it reaches the last message, then the consumer gets disconnected.