mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

RabbitMQ Streams: x-stream-offset issue with aio-pika version 6.8.2 #487

Open aman16sachdeva opened 1 year ago

aman16sachdeva commented 1 year ago

Hi,

I'm working with a project where I have to work with Python 3.5, no choice there. And the maximum available version for Python 3.5 is aio-pika 6.8.2

Using aio-pika, I'm trying to declare and consume from RabbitMQ Streams (using AMQP Protocol to connect, declare and consume, of course), but while doing so I'm facing a weird issue.

While consuming if I provide arguments: {'x-stream-offset': 128}, the consumer actually gets initialised with arguments: {'x-stream-offset': -128} (notice the negative sign). And this makes the consumer essentially start from 'start' of the Stream.

So if offset is given 128, consumer get's initialised with 128 - 256 = -128, if it's 129, consumer gets initialised with 129 - 256 = -127, .. for 255 it results in 255 - 256 = -1

This problem is occurring with offset values from 128 to 255. From 0 to 127 and through 256, the behaviour is normal. Can someone help me with identifying the root cause of this issue, or can this issue be somehow solved, or is there a work around?

The same issue is not coming with aio-pika version 7.2.0 and nor with the latest.

Code (taken directly from aio-pika's documentation 6.8.2 > Quick Start > Connection Pooling), with a few tweaks for the code to be able to work with RabbitMQ Streams:

import asyncio

import aio_pika
from aio_pika.pool import Pool

async def main():
    loop = asyncio.get_event_loop()

    async def get_connection():
        amqp_url = "amqp://guest:guest@localhost/"  # Change Accordingly
        return await aio_pika.connect_robust(amqp_url)

    connection_pool = Pool(get_connection, max_size=2, loop=loop)

    async def get_channel() -> aio_pika.Channel:
        async with connection_pool.acquire() as connection:
            return await connection.channel()

    channel_pool = Pool(get_channel, max_size=10, loop=loop)
    queue_name = "pool_queue"

    async def consume():
        async with channel_pool.acquire() as channel:  # type: aio_pika.Channel
            await channel.set_qos(10)

            queue = await channel.declare_queue(
                queue_name, 
                durable=True, 
                auto_delete=False, 
                arguments={'x-queue-type': 'stream'}  # For Stream
            )

            OFFSET = 128  # Change anywhere from 0-127, then 128-255, then 256 and onwards
            async with queue.iterator(arguments={'x-stream-offset': OFFSET}) as queue_iter:
                async for message in queue_iter:
                    print(message)
                    await message.ack()

    async def publish():
        async with channel_pool.acquire() as channel:  # type: aio_pika.Channel
            await channel.default_exchange.publish(
                aio_pika.Message(("Channel: %r" % channel).encode()),
                queue_name,
            )

    async with connection_pool, channel_pool:
        task = loop.create_task(consume())
        await asyncio.wait([publish() for _ in range(100)])  # Reduced the number of messages
        await task

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

Stack Overflow Question's link

mosquito commented 1 year ago

I planned to support version 3.5 for as long as I could, however, in pamqp support has been discontinued since version 3.0.0.

The error that you have is a problem with serialization, that is, the origins of the problem are somewhere in pamqp.

If you want my opinion, then I would strongly recommend that you make an effort to update python, if you will investing human resources to upgrade in version 3.10, you can receive all security updates up to 04 Oct 2026.

Unfortunately, the world is not perfect, and more and more problems will be found in older versions than are known now, and since the community has already refused support, you will be left alone with this problem.