mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Best way to aggregate 2 queues? #518

Closed teplinsky-maxim closed 1 year ago

teplinsky-maxim commented 1 year ago

I have 2 queues, and I need to fetch a message from the first or the second (whichever returns first), and process it, then do it again, but for some reason on message.ack() Writer always exits, I tried to debug the issue, and seems like the problem is in this lines of code, at the moment of ack writer is already closed, so one of the coroutine closes and the code ends it's execution, but I need the code to infinitely process tasks. Also I asked the same question on stackoverflow

# aiormq/connection.py : 631
                        fp.write(
                            pamqp.frame.marshal(
                                frame, channel_frame.channel_number,
                            ),
                        )

My code:

import asyncio
import logging

import aio_pika
from aio_pika import IncomingMessage
from dotenv import load_dotenv

load_dotenv()

async def process(message: IncomingMessage):
    logging.info(message.body)
    await message.ack(message)
    print('here')

async def main():
    logging.basicConfig(level=logging.DEBUG)

    connection = await aio_pika.connect_robust(
        f"amqp://test_1:password@localhost:5672/vhost?heartbeat={180}"
    )

    channel = connection.channel()

    await channel.initialize(timeout=1000)
    await channel.set_qos(prefetch_count=1)

    queue1 = await channel.get_queue('q1')
    queue2 = await channel.get_queue('q2')

    q1_task = queue1.consume(process)
    q2_task = queue2.consume(process)

    await asyncio.gather(q1_task, q2_task)
    print(1)

if __name__ == '__main__':
    asyncio.run(main())

The log:

DEBUG:aiormq.connection:Connecting to: amqp://test_1:******@localhost:5672/vhost?heartbeat=180
DEBUG:aio_pika.connection:Creating AMQP channel for connection: <RobustConnection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" 0 channels>
DEBUG:aio_pika.connection:Channel created: <aio_pika.robust_channel.RobustChannel object at 0x7fc6933c2920>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Open object at 0x7fc6933fcf80>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Channel.OpenOk object at 0x7fc6933fc200> in channel #1 weight=16 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Confirm.Select object at 0x7fc693512fc0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Confirm.SelectOk object at 0x7fc693410070> in channel #1 weight=12 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Qos object at 0x7fc693557ab0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Basic.QosOk object at 0x7fc693410190> in channel #1 weight=12 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Qos object at 0x7fc693408ef0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Basic.QosOk object at 0x7fc693410130> in channel #1 weight=12 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aio_pika.queue:Declaring queue: <RobustQueue(test_task_in): auto_delete=False, durable=False, exclusive=False, arguments=None
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Queue.Declare object at 0x7fc6933e0120>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Queue.DeclareOk object at 0x7fc6934098a0> in channel #1 weight=42 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aio_pika.queue:Declaring queue: <RobustQueue(test_2_task_in): auto_delete=False, durable=False, exclusive=False, arguments=None
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Queue.Declare object at 0x7fc6933e0970>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Queue.DeclareOk object at 0x7fc693409b70> in channel #1 weight=44 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aio_pika.queue:Start to consuming queue: <RobustQueue(test_task_in): auto_delete=False, durable=False, exclusive=False, arguments=None
DEBUG:aio_pika.queue:Start to consuming queue: <RobustQueue(test_2_task_in): auto_delete=False, durable=False, exclusive=False, arguments=None
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Consume object at 0x7fc6933e0200>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Basic.ConsumeOk object at 0x7fc6933fe500> in channel #1 weight=51 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aiormq.connection:Received frame <Basic.Deliver object at 0x7fc6933f4640> in channel #1 weight=83 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aiormq.connection:Received frame <pamqp.header.ContentHeader object at 0x7fc6934106d0> in channel #1 weight=61 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aiormq.connection:Received frame <pamqp.body.ContentBody object at 0x7fc693410850> in channel #1 weight=16 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
INFO:root:b'{"3": 4}'
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Consume object at 0x7fc6933e0ac0>], drain_future=None)
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Ack object at 0x7fc693938100>], drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /home/vovan/PycharmProjects/w-test/venv/lib/python3.10/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
DEBUG:aiormq.connection:Writer exited for <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
1
DEBUG:aiormq.connection:Received frame <Basic.ConsumeOk object at 0x7fc6933fef00> in channel #1 weight=51 on <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aiormq.connection:Reader exited for <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40>
DEBUG:aiormq.connection:Closing connection <Connection: "amqp://test_1:******@localhost:5672/vhost?heartbeat=180" at 0x7fc6933c9e40> cause: CancelledError()
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-26' coro=<OneShotCallback.__task_inner() running at /home/vovan/PycharmProjects/w-test/venv/lib/python3.10/site-packages/aio_pika/tools.py:230>>
/usr/lib/python3.10/asyncio/base_events.py:671: RuntimeWarning: coroutine 'OneShotCallback.__task_inner' was never awaited
  self._ready.clear()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-19' coro=<Connection._on_reader_done.<locals>.close_writer_task() running at /home/vovan/PycharmProjects/w-test/venv/lib/python3.10/site-packages/aiormq/connection.py:506> wait_for=<Future finished result=None>>
sys:1: RuntimeWarning: coroutine 'OneShotCallback.__task_inner' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-25' coro=<OneShotCallback.__task_inner() running at /home/vovan/PycharmProjects/w-test/venv/lib/python3.10/site-packages/aio_pika/tools.py:230>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-27' coro=<OneShotCallback.__task_inner() running at /home/vovan/PycharmProjects/w-test/venv/lib/python3.10/site-packages/aio_pika/tools.py:230>>
teplinsky-maxim commented 1 year ago

I found out the problem. await message.ack(message) caused the issue. ack() takes no argument... The code I wrote is ok