mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

How to use it correctly with py-cord? (Task was destroyed but it is pending) #535

Open ZentixUA opened 1 year ago

ZentixUA commented 1 year ago

Hello. Following the tutorial from the wiki (https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/1-introduction.html#receiving) I don't have any problems, but if I try to run main() via await in the on_ready() event, then the code will start giving errors like this:

Traceback (most recent call last):
    async with connection:
RuntimeError: coroutine ignored GeneratorExit
Ignoring exception in on_ready
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/discord/client.py", line 378, in _run_event
    await coro(*args, **kwargs)
  File "/home/host/servers/bots/adventblocks/bot.py", line 51, in on_ready
    await main()
RuntimeError: coroutine ignored GeneratorExit
Task was destroyed but it is pending!
task: <Task pending name='pycord: on_ready' coro=<Client._run_event() done, defined at /usr/local/lib/python3.10/dist-packages/discord/client.py:370> wait_for=<Future pending cb=[Task.task_wakeup()]>>
 [x] Received message IncomingMessage:{'app_id': None.......
mosquito commented 1 year ago

Please add your code example.

ZentixUA commented 1 year ago

@bot.event
async def on_ready():
    await main()

async def send_message(message: AbstractIncomingMessage) -> None:
    print(" [x] Received message %r" % message)
    print("Message body is: %r" % message.body)

    print("Before sleep!")
    await asyncio.sleep(5)  # Represents async I/O operations
    print("After sleep!")

async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")
    async with connection:
        # Creating a channel
        channel = await connection.channel()

        # Declaring queue
        queue = await channel.declare_queue("request")

        # Start listening the queue with name 'hello'
        await queue.consume(send_message, no_ack=True)

        print(" [*] Waiting for messages. To exit press CTRL+C")
        await asyncio.Future()
ZentixUA commented 1 year ago

I used a slightly different way before, but it's unstable when the rabbitmq server is unavailable - if more than two times, then I get this error:

Connection attempt to "amqp://guest:******@localhost:5672//" failed: [Errno 111] Connect call failed ('127.0.0.1', 5672). Reconnecting after 5 seconds.
Task was destroyed but it is pending!
task: <Task pending name='pycord: on_ready' coro=<Client._run_event() running at /usr/local/lib/python3.10/dist-packages/discord/client.py:378> wait_for=<Future pending cb=[Task.task_wakeup()]>>
Connection attempt to "amqp://guest:******@localhost:5672//" failed: [Errno 111] Connect call failed ('127.0.0.1', 5672). Reconnecting after 5 seconds.
@bot.event
async def on_ready():
    await connect_rabbitmq()

async def send(message):
    ...

async def connect_rabbitmq():
    connection = await aio_pika.connect_robust(host="localhost")

    queue_name = "request"

    # Creating channel
    channel = await connection.channel()

    # Declaring queue
    queue = await channel.declare_queue(queue_name)

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                await send(message.body)
mosquito commented 1 year ago

The first of all your main() should be a separate task IMHO.

Just add:

@bot.event
async def on_ready():
    task = asyncio.create_task(main())
    # if the your discord library supports long events just add awaiting of this task
    # await task
mosquito commented 1 year ago

But to be honest, I would advise you to look at aiomisc and run it as 2 separate aiomisc.Service

ZentixUA commented 1 year ago

Using task = asyncio.create_task(connect_rabbitmq())

Now I get this after first getting a message from rabbitmq:

Exception ignored in: <coroutine object connect_rabbitmq at 0x7faa2b35eb20>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/discord/http.py", line 111, in __init__
    {
RuntimeError: coroutine ignored GeneratorExit
Task was destroyed but it is pending!
task: <Task pending name='Task-16' coro=<connect_rabbitmq() running at **/bot.py:225> wait_for=<Future pending cb=[Task.task_wakeup()]>>
async def send_message(message: AbstractIncomingMessage):
    nickname = json.loads(message.body.decode())["nickname"]
    data = json.loads(message.body.decode())["data"]

    embed = discord.Embed(...)
    await channel.send(...)
ZentixUA commented 1 year ago

Okay, I'll use my old method for now, but via task = asyncio.create_task(connect_rabbitmq())

What about reconnects? After 15-20 reconnects I get this:

Task was destroyed but it is pending!
task: <Task pending name='Task-16' coro=<connect_rabbitmq() running at /home/host/servers/bots/adventblocks/bot.py:236> wait_for=<Future pending cb=[Task.task_wakeup() ]>>
mosquito commented 1 year ago

I really don't understand the architecture of your service, but it looks like the event loop has been stopped.

You may use this example instead of aio_pika and, I guess, the result will be the same.

@bot.event
async def on_ready():
    try:
        await asyncio.sleep(86400)
    except:
        print("oooops")
    else:
        print("The mosquito was wrong")
ZentixUA commented 1 year ago

Miracles. Sometimes (after restarting the bot) this problem does not occur. Magic again

mosquito commented 1 year ago

Ever since I got an "D" in telepathy in high school, I don't believe in magic.

ZentixUA commented 1 year ago

this problem only occurs when reconnecting to some of the attempts SOMETIMES. Someone has ideas?

ZentixUA commented 1 year ago

The try: block prevents this problem from occurring.. Lol what

@bot.event
async def on_ready():
    await bot.change_presence(activity=discord.Game(name=".help"))
    bot.add_view(Requests())
    task = asyncio.create_task(connect_rabbitmq())

    print("One")

    try:
        await asyncio.sleep(1000)
    except:
        print("oooops")
        traceback.format_exc()
    else:
        print("The mosquito was wrong")

    print("Two")
ZentixUA commented 1 year ago

on my local PC everything seems to be fine. But it's not certain yet.

ZentixUA commented 1 year ago

no, the same thing on the test ubuntu 22.04 hyper-v instance

ZentixUA commented 1 year ago

Same thing with this one! On test host!

import asyncio
import json
import traceback

import aio_pika

async def send(message):
    nickname = json.loads(message.decode())["nickname"]
    data = json.loads(message.decode())["data"]
    print(f"Заявка на ник: {nickname}")

async def connect_rabbitmq():
    # logging.basicConfig(level=logging.DEBUG)
    connection = await aio_pika.connect_robust(host="localhost")
    print("Успешно подключено к RabbitMQ!")

    queue_name = "request"

    # Creating channel
    channel = await connection.channel()

    # Will take no more than 10 messages in advance

    # Declaring queue
    queue = await channel.declare_queue(queue_name)

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                await send(message.body)

async def connect():
    asyncio.create_task(connect_rabbitmq())

    print("One")

    try:
        await asyncio.sleep(5000)
    except:
        print("oooops")
        traceback.format_exc()
    else:
        print("The mosquito was wrong")

    print("Two")

asyncio.run(connect())
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<connect_rabbitmq() running at /home/main/test2.py:30> wait_for=<Future pending cb=[Task.task_wakeup()]>>

Even with a try: block

ZentixUA commented 1 year ago

But this time it took longer to get the problem

ZentixUA commented 1 year ago

This is 100% happening with the code above, check it yourself. Here is the log: custom.log