LonamiWebs / Telethon

Pure Python 3 MTProto API Telegram client library, for bots too!
https://docs.telethon.dev
MIT License
9.69k stars 1.38k forks source link

Task was destroyed but it is pending #1634

Closed slavnycoder closed 2 years ago

slavnycoder commented 3 years ago

Checklist

Code that causes the issue

from telethon import TelegramClient

@app.task(bind=True)
def crawl_channels(self):
    async def _crawl_channels(_client: TelegramClient):
        authors: List[Author]
        for channel_obj in authors:
            channel: Optional[InputPeerChannel] = await get_channel(_client, channel_obj)
            if channel is None:
                continue
            posts = await _client(GetHistoryRequest(
                peer=channel,
                limit=0 if channel_obj.last_msg_id else 10,
                offset_date=None,
                offset_id=0,
                max_id=0,
                min_id=channel_obj.last_msg_id,
                add_offset=0,
                hash=0
            ))
            if posts.messages:
                for msg in posts.messages:
                    await self.client.download_media(msg)
            time.sleep(1)  # avoid FloodTimeout

    with cache.lock(f"{self.name}-lock"):
        tlg_session_db_url = "postgres://{USER}:{PASSWORD}@{HOST}:{PORT}/{NAME}".format(**TLG_SESSION_DB_URL)
        container = AlchemySessionContainer(tlg_session_db_url)
        session = container.new_session(TLG_SESSION_NAME)
        with TelegramClient(session, TLG_API_ID, TLG_API_HASH) as client:
            client.loop.run_until_complete(_crawl_channels(client))

Traceback

Task was destroyed but it is pending!
task: <Task pending name='Task-254' coro=<Connection._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:167> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f4d62819e20>()]>>
asctime: 2020-11-16 09:27:07,916

Task was destroyed but it is pending!
task: <Task pending name='Task-255' coro=<Connection._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:185> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f4d62825ac0>()]>>
asctime: 2020-11-16 09:27:07,922

Task was destroyed but it is pending!
task: <Task pending name='Task-256' coro=<MTProtoSender._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:442> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f4d627a2130>()]>>

Problem with celery task, that aggregates media from some channels. Previously i tried "sync" version of TelethonClient and got same errors.

hagemouse commented 3 years ago

I have the same problem, just in "for file in files":

files = await client.get_messages(channel, None, filter=InputMessagesFilterDocument)
if files is not None:
    for file in files:
        attributes = file.media.document.attributes
slavnycoder commented 3 years ago

this may help you

client.loop.run_until_complete(payload(client))
for task in asyncio.Task.all_tasks(client.loop):
    task.cancel()
Lonami commented 3 years ago

That's not a solution, the library should clean up all the tasks it creates when the client is disconnected, so let's keep this open until it's solved. It is a workaround to get rid of the warnings, though.

Lonami commented 3 years ago

This fixes one of the problems leading to this warning. I am not sure if there are more, as I was fixing something else, not purposedly trying to fix this.

slavnycoder commented 3 years ago

Still occurs

Lonami commented 3 years ago

Perhaps the library could give each task a more descriptive name, which could help spot the issue. Unfortunately, the task name is a Python 3.8+ thing https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task.

slavnycoder commented 3 years ago

Maybe it will help

task.print_stack() output

Stack for <Task pending name='Task-12' coro=<Connection._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:306> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 306, in _send_loop
    self._send(await self._send_queue.get())
Stack for <Task pending name='Task-63' coro=<Connection._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:324> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 324, in _recv_loop
    data = await self._recv()
Stack for <Task pending name='Task-6' coro=<MTProtoSender._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:485> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 485, in _recv_loop
    body = await self._connection.recv()
Stack for <Task pending name='Task-3' coro=<Connection._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:306> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 306, in _send_loop
    self._send(await self._send_queue.get())
Stack for <Task pending name='Task-13' coro=<Connection._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:324> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 324, in _recv_loop
    data = await self._recv()
Stack for <Task pending name='Task-64' coro=<MTProtoSender._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:442> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 442, in _send_loop
    batch, data = await self._send_queue.get()
Stack for <Task pending name='Task-7' coro=<UpdateMethods._update_loop() running at /usr/local/lib/python3.8/site-packages/telethon/client/updates.py:328> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/client/updates.py", line 328, in _update_loop
    await asyncio.wait_for(
Stack for <Task pending name='Task-4' coro=<Connection._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:324> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 324, in _recv_loop
    data = await self._recv()
Stack for <Task pending name='Task-14' coro=<MTProtoSender._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:442> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 442, in _send_loop
    batch, data = await self._send_queue.get()
Stack for <Task pending name='Task-65' coro=<MTProtoSender._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:485> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 485, in _recv_loop
    body = await self._connection.recv()
Stack for <Task pending name='Task-62' coro=<Connection._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:306> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 306, in _send_loop
    self._send(await self._send_queue.get())
Stack for <Task pending name='Task-5' coro=<MTProtoSender._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:442> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 442, in _send_loop
    batch, data = await self._send_queue.get()
Stack for <Task pending name='Task-15' coro=<MTProtoSender._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:485> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 485, in _recv_loop
    body = await self._connection.recv()
maximusfox commented 2 years ago

Try call this manually in the end client.disconnect()

MaxSSD commented 2 years ago

Any fixes?

Adophilus commented 2 years ago

Thanks @maximusfox this fixes the issue for me

Lonami commented 1 year ago

To anyone else having this issue:

If you spawn a task (with asyncio.create_task or similar APIs), and that task creates TelegramClient instances, you are likely to receive this error if the task your code spawned is destroyed:

Task was destroyed but it is pending!
task: <Task pending name='Task-201' coro=<YOUR_OWN_METHOD() done, defined at C:\...\example.py:1234> wait_for=<Future pending cb=[Task.task_wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-208' coro=<Connection._send_loop() done, defined at C:\...\telethon\network\connection\connection.py:305> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending name='Task-209' coro=<Connection._recv_loop() done, defined at C:\...\telethon\network\connection\connection.py:323> wait_for=<Future pending cb=[Task.task_wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-210' coro=<MTProtoSender._send_loop() done, defined at C:\...\telethon\network\mtprotosender.py:440> wait_for=<Future pending cb=[Task.task_wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-211' coro=<MTProtoSender._recv_loop() done, defined at C:\...\telethon\network\mtprotosender.py:491> wait_for=<Future cancelled>>

Note how YOUR_OWN_METHOD will be printed first (because it's the "root" task from which others were spawned).

To prevent "task destroyed" from happening, make sure it's not being garbage-collected. This can mean storing it in a list or other containers:

# likely to trigger the warning messages eventually
asyncio.create_task(YOUR_OWN_METHOD())

# less prone to trigger the messages because a reference to the returned task is kept
tasks = []
tasks.append(asyncio.create_task(YOUR_OWN_METHOD()))

(Obviously depending on your case the above simple code may grow the list forever which is undesirable.)

Ideally, try to follow the principle of structured concurrency. Structure your tasks like a tree (with asyncio.gather, asyncio.wait, and so on), and not just separate threads of execution.

yyystation commented 1 year ago

To anyone

As you mentioned, I encountered this issue when using asyncio.create_task to create tasks. In my handle_kafka_message method, I create a client object and complete the process of creating a Telegram session.

Just as you said, storing each task in a dictionary indeed resolved this problem:

async for message in self.kafka_consumer.consumer:
     print(message)
     timestamp = message.timestamp
     self.task[timestamp] = asyncio.create_task(self.handle_kafka_message(message))