qweeze / rstream

A Python asyncio-based client for RabbitMQ Streams
MIT License
83 stars 13 forks source link

Fixed a bug when, after unsubscribing, the next subscriber with the same subscriber name used a first subscriber callback and ignore second subscriber callback. #204

Closed nesb1 closed 2 months ago

nesb1 commented 2 months ago

Good day!

I found a bug and suggest a solution.

import asyncio

from rstream import Consumer, AMQPMessage
from rstream import MessageContext
from rstream import Producer

async def main():
    consumer = Consumer(host="rabbitmq", username="guest", password="guest")

    producer = Producer(host="rabbitmq", username="guest", password="guest")

    await consumer.start()

    async def cb1(message: AMQPMessage, context: MessageContext):

        print(f"cb1: {context.offset}")

    async def cb2(message: AMQPMessage, context: MessageContext):
        print(f"cb2: {context.offset}")

    await consumer.create_stream(stream="kek", exists_ok=True)

    subscriber_name1 = await consumer.subscribe("kek", cb1)

    await producer.send_wait(stream="kek", message=b"123")

    await asyncio.sleep(5)

    await consumer.unsubscribe(subscriber_name=subscriber_name1)

    subscriber_name2 = await consumer.subscribe("kek", cb2)

    await asyncio.sleep(5)

asyncio.run(main())

Expected output for this code:

cb1: 0
cb2: 0

Actual Output:

cb1: 0
cb1: 0
Gsantomaggio commented 2 months ago

Thank you @nesb1 we will have a look as soon as possible

DanielePalaia commented 2 months ago

Looks good to me, thanks for the fix and the tests. If you have done with this we can merge

nesb1 commented 2 months ago

I have done, lets merge it