qweeze / rstream

A Python asyncio-based client for RabbitMQ Streams
MIT License
84 stars 13 forks source link

Issue when unsubscribing/subscribing again a subscriber_name #198

Closed ptoews closed 3 months ago

ptoews commented 3 months ago

Hi,

we would like to seek to a different timestamp while a subscription is already running. I tried to simplify our current approach to this script:

import asyncio
import datetime

import rstream as rs

STREAM = "hello-stream"
APP_ID = "abcdef123"

async def seek_somewhere(consumer: rs.Consumer):
    await asyncio.sleep(1)
    print("SEEKING NOW")
    await consumer.unsubscribe(APP_ID)

    # await self._setup()
    timestamp = int(datetime.datetime(2023, 7, 18,
                                      tzinfo=datetime.timezone.utc).timestamp())
    offset = rs.ConsumerOffsetSpecification(rs.OffsetType.TIMESTAMP, timestamp)
    await consumer.subscribe(
        stream=STREAM,
        subscriber_name=APP_ID,
        callback=print,
        decoder=rs.amqp_decoder,
        offset_specification=offset
    )
    await consumer.start()
    print("finished seeking")

async def main():
    consumer = rs.Consumer(
        host="localhost",
        username="guest",
        password="guest",
        connection_name=APP_ID
    )
    await consumer.subscribe(
        stream=STREAM,
        subscriber_name=APP_ID,
        callback=print,
        decoder=rs.amqp_decoder,
    )
    task = asyncio.create_task(seek_somewhere(consumer))
    await consumer.run()
    await task

if __name__ == '__main__':
    asyncio.run(main())

However, this doesn't work. Messages are received, until the seeking is attempted, after which no more messages are received (but the program keeps running).

What is the best way to do this? Do we have to create a new consumer?

Thanks for any help in advance.

ptoews commented 3 months ago

I tried to additionally clear the subscribers first, but that didn't help. Then, I got it working by simply closing the consumer instead of just unsubscribing. However, I don't think this is a good solution, as it has to initiate a new connection.

I had a look at what close() does, and it seems it's just these three steps:

  1. unsubscribe
  2. remove subscribers
  3. close connection

but since manually doing just 1 and 2 didn't work, it seems to require closing the connection, which doesn't make sense to me. Surely I am missing something?

DanielePalaia commented 3 months ago

Hey Hi @ptoews thanks to have reported this. I will have a look even if it may take a while!

DanielePalaia commented 3 months ago

@ptoews it seems like the issue is when we subscribe again with the same subscriber_name, I need to investigate why this is happening. Can you try to subscribe in seek_somewhere with a different APP_ID?

ptoews commented 3 months ago

@DanielePalaia Yes, that did work, thank you!

DanielePalaia commented 3 months ago

@ptoews Great! I'll leave this issue open (changing the title) as there is probably a bug in the code

DanielePalaia commented 3 months ago

HI @ptoews I implemented a fix in this PR https://github.com/qweeze/rstream/pull/199 that should allow you to use the same subscriber_name. I made a few tests and it seems working fine. Maybe you can also do a few tests on your previous scenario?

ptoews commented 3 months ago

@DanielePalaia Awesome, I did some quick initial tests and didn't find any issues so far.