ronf / asyncssh

AsyncSSH is a Python package which provides an asynchronous client and server implementation of the SSHv2 protocol on top of the Python asyncio framework.
Eclipse Public License 2.0
1.56k stars 153 forks source link

task hangs when another exception is raised when options.waiter is cancelled. #634

Open grvstick opened 8 months ago

grvstick commented 8 months ago

Asyncssh 2.14.1 Linux mint 21.3 (ubuntu jammy) Python 3.12.0

The issue is reproducable using the following code

import asyncio
from traceback import print_exc
from typing import TypedDict

import asyncssh

ops_config = {
 #config dict
}

class SshCred(TypedDict):
    host: str
    username: str
    password: str

class Config(TypedDict):
    ssh: SshCred

QUERY = "Some Query"

class ErrorCase:
    def __init__(
        self, config: Config
    ) -> None:
        cfg: Config = config

        self._ssh_cred: SshCred = cfg["ssh"]
        self.task_q: asyncio.Queue[tuple[str, asyncio.Queue]] = asyncio.Queue()
        self.terminate = asyncio.Event()

    async def query_task(self):
        listener: None | asyncssh.SSHListener
        try:
            async with asyncssh.connect(**self._ssh_cred) as conn:
                listener = await conn.forward_local_port("", 3306, "localhost", 3306)
                raise ZeroDivisionError
        finally:
            if listener is not None:
                listener.close()
                await listener.wait_closed()

    async def query(self, query: str):
        loop = asyncio.get_running_loop()
        resp_q = asyncio.Queue()

        await self.task_q.put((query, resp_q))

        try:
            async with asyncio.timeout(5) as cm:
                await self.task_q.join()
                cm.reschedule(loop.time() + 1)
                return await resp_q.get()
        except TimeoutError:
            return

    async def main_task(self):
        while not self.terminate.is_set():
            try:
                await self.query_task()
            except ZeroDivisionError:
                pass
            except Exception:
                print_exc()
            finally:
                await asyncio.sleep(3)

async def main():
    db = ErrorCase(ops_config)
    task = asyncio.create_task(db.main_task())
    for _ in range(3):
        print(await db.query(QUERY))

    # db.terminate.set()
    task.cancel()
    await task

asyncio.run(main())

Running this code results in this traceback. Which is not odd, but it hangs there indefinitely.

None
None
None
Traceback (most recent call last):
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 42, in query_task
    async with asyncssh.connect(**self._ssh_cred) as conn:
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/misc.py", line 274, in __aenter__
    self._coro_result = await self._coro
                        ^^^^^^^^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 8231, in connect
    return await asyncio.wait_for(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/lib/python3.12/asyncio/tasks.py", line 510, in wait_for
    return await fut
           ^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 441, in _connect
    await options.waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 67, in main_task
    await self.query_task()
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 46, in query_task
    if listener is not None:
       ^^^^^^^^
UnboundLocalError: cannot access local variable 'listener' where it is not associated with a value
^CTraceback (most recent call last):
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 42, in query_task
    async with asyncssh.connect(**self._ssh_cred) as conn:
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/misc.py", line 274, in __aenter__
    self._coro_result = await self._coro
                        ^^^^^^^^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 8231, in connect
    return await asyncio.wait_for(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/lib/python3.12/asyncio/tasks.py", line 510, in wait_for
    return await fut
           ^^^^^^^^^
  File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 441, in _connect
    await options.waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 67, in main_task
    await self.query_task()
  File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 46, in query_task
    if listener is not None:
       ^^^^^^^^
UnboundLocalError: cannot access local variable 'listener' where it is not associated with a value

When I ran the debugger, the main_task is not cancelled, and still runs the loop.

ronf commented 8 months ago

I'm not sure this is an AsyncSSH problem.

Even though you are canceling main_task(), you have a catch-all "except Exception" in there, which I think will end up capturing the CancelledError from your task.cancel(), and that will prevent the task from being cancelled. Instead, it will go back up and check if self.terminate is set, but it won't be since you have that commented out.

As for the secondary error, that's because you are successfully cancelling the asyncssh.connect() call in query_task, and that causes it to attempt to run the "finally" block which references listener, but listener is only assigned in the case where the asyncssh.connect() succeeds. You should probably move the "try" to be after listener is assigned if you want to refer to it in the "finally" block, or you should assign its initial value to be None prior to the "try".

I haven't tested it, but if you put in an except CancelledError in main_task which raises the error to prevent the except Exception from capturing it, I think it could fix the problem. Alternately, add a raise after print_exc() in the except Exception block, or explicitly break out of the loop when you see a CancelledError.

grvstick commented 8 months ago

Thanks for the swift reply, here are a few comments

Even though you are canceling main_task(), you have a catch-all "except Exception" in there, which I think will end up capturing the CancelledError from your task.cancel(), and that will prevent the task from being cancelled. Instead, it will go back up and check if self.terminate is set, but it won't be since you have that commented out.

From what I know, this used to be the early behavior of Exception class, and now asyncio.CancelledError is categorized differently. To add up, changing the main_task like this does not change the behavior.

    async def main_task(self):
        while not self.terminate.is_set():
            try:
                await self.query_task()
            except ZeroDivisionError:
                pass
            except asyncio.CancelledError:
                raise
            except Exception:
                print_exc()
            finally:
                await asyncio.sleep(3)

As for the secondary error, that's because you are successfully cancelling the asyncssh.connect() call in query_task, and that causes it to attempt to run the "finally" block which references listener, but listener is only assigned in the case where the asyncssh.connect() succeeds. You should probably move the "try" to be after listener is assigned if you want to refer to it in the "finally" block, or you should assign its initial value to be None prior to the "try".

Omitting listener initialization was my mistake in the beginning, and I fixed it in my main code that I'm using. I left it there because it could reproduce the problem. Even with the unbound local variable mistake, the task should be able to cancel when it was asked to cancel, or at least that is what I think.

ronf commented 4 months ago

Sorry - it looks like I missed your last post here. Were you able to resolve this issue?