ronf / asyncssh

AsyncSSH is a Python package which provides an asynchronous client and server implementation of the SSHv2 protocol on top of the Python asyncio framework.
Eclipse Public License 2.0
1.54k stars 151 forks source link

Future exception was never retrieved - ConnectionLost #318

Closed jtougas closed 3 years ago

jtougas commented 4 years ago

I'm trying to handle things correctly so that this error log doesn't show up under a particular and somewhat contrived scenario. (I tried posting to the user mailing list, twice, but my message was being deleted as soon as I submitted it.)

[ERROR][asyncio][base_events.py:1707] Future exception was never retrieved
future: <Future finished exception=ConnectionLost('Connection lost')

It should be possible to exit cleanly with no error logs in all cases right?

Here's the program I've come up with to reproduce the issue, about 7 or 8 times out of 10, and the log of it's execution.

import asyncio
import logging
from asyncio import gather, get_event_loop
from time import sleep

import asyncssh
from asyncssh import SSHClient

from redacted.lib.correlate import with_new_correlation_context
from redacted.lib.entrypoint import wrap_entry_point

logger = logging.getLogger(__name__)

class MySSHClient(SSHClient):

    def connection_made(self, conn):
        logger.info('connection_made - blocking sleep for 3 seconds')
        sleep(3)

    def connection_lost(self, exc):
        logger.info('connection_lost')

async def do_work():
    async with asyncssh.connect(
        host='172.20.50.53',
        port=22,
        username='***redacted***',
        password='***redacted***',
        known_hosts=None,
        client_factory=MySSHClient
    ) as c:
        logger.info("Ready to use connection")

@with_new_correlation_context
async def _impl():
    try:
        await asyncio.wait_for(do_work(), timeout=3)
    except asyncio.exceptions.TimeoutError:
        return False
    return True

@wrap_entry_point
async def main():
    get_event_loop().set_debug(True)
    await gather(_impl(), _impl(), return_exceptions=False)

if __name__ == '__main__':
    main()
jtougas commented 4 years ago

I should mention that this was run in a docker container with ubuntu 20, python 3.8 and asynchssh 2.4

ronf commented 4 years ago

I'm trying to handle things correctly so that this error log doesn't show up under a particular and somewhat contrived scenario. (I tried posting to the user mailing list, twice, but my message was being deleted as soon as I submitted it.)

I'm not sure why, but Google Groups decided your e-mail might be spam. I have since approved the messages, but then I notice you also posted here, so I thought this might be a better place to answer.

The ConnectionLost error happens if the connection is closed uncleanly. If you don't want the error about not retrieving the exception, you'll want to add an exception handler around your "async with asyncssh.connect" block. That way, it won't report the exception as unretrieved when you exit that coroutine.

There's something else I noticed, though, but it might just be in the snippet you showed here and not in the actual code. You are using "async with" around the connect, but then just logging something and exiting the block. When you exit out of the "async with", the connection is automatically closed. If you want to continue using the connection after that block returns, you'll need to use "await" rather than "async with" and then later call close() and await wait_closed() on the connection, or alternately do an "async with conn" (where conn is the return value of awaiting the connect()) at some later point where you are doing the actual I/O on the connection.

jtougas commented 4 years ago

Here's the modified part to catch the exception and continue. The ConnectionLost error log remains however.

async def do_work():
    try:
        async with asyncssh.connect(
            host='...',
            port=22,
            username='...',
            password='...',
            known_hosts=None,
            client_factory=MySSHClient
        ) as c:
            logger.info("Ready to use connection")
    except BaseException as e:
        logger.info(f"handle exception ({type(e)})")
...
[2020-09-27 14:01:05,092][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][__main__][test.py:18] connection_made - blocking sleep for 3 seconds
[2020-09-27 14:01:08,095][][MainThread][WARNING][asyncio][base_events.py:1854] Executing <Handle SSHConnection.connection_made(<_SelectorSoc...e, bufsize=0>>) created at /usr/lib/python3.8/asyncio/selector_events.py:766> took 3.004 seconds
[2020-09-27 14:01:08,097][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][__main__][test.py:37] handle exception (<class 'asyncio.exceptions.CancelledError'>)
[2020-09-27 14:01:08,098][1aab8b61-cf95-4f67-95a8-4223987694ee][MainThread][INFO][__main__][test.py:37] handle exception (<class 'asyncio.exceptions.CancelledError'>)
[2020-09-27 14:01:08,100][1aab8b61-cf95-4f67-95a8-4223987694ee][MainThread][DEBUG][asyncssh][logging.py:79] [conn=0] Received key exchange request
[2020-09-27 14:01:08,101][1aab8b61-cf95-4f67-95a8-4223987694ee][MainThread][DEBUG][asyncssh][logging.py:79] [conn=0] Beginning key exchange
[2020-09-27 14:01:08,102][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][asyncssh][logging.py:79] [conn=1] Connection lost
[2020-09-27 14:01:08,103][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][__main__][test.py:22] connection_lost
[2020-09-27 14:01:08,103][][MainThread][ERROR][asyncio][base_events.py:1707] Future exception was never retrieved
future: <Future finished exception=ConnectionLost('Connection lost') created at /usr/lib/python3.8/asyncio/base_events.py:422>
...
ronf commented 4 years ago

Hmm - one thing I noticed here is that the ConnectionLost is being reported on conn=1:

[2020-09-27 14:01:08,102][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][asyncssh][logging.py:79] [conn=1] Connection lost

The other exceptions you are handling seem to be on connection 0, though.

Do you have any earlier log messages related to connection 1? I guess your _impl() code is called twice, so we'd expect there to be two connections happening simultaneously here, but it's not clear how far that second connection is getting.

The other log message of interest is:

[2020-09-27 14:01:08,103][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][main][test.py:22] connection_lost

That seems to be coming from the MySSHClient.connection_lost() method, so the notification is making it there, though that doesn't identify which of the two connections that was associated with.

It would be helpful to get more information about what task it was that the exception was never retrieved on. I've tried running your sample code here, but I'm not able to reproduce the problem on either MacOS or Linux. I get the following output:

DEBUG:asyncio:Using selector: EpollSelector
INFO:asyncssh:Opening SSH connection to localhost, port 22
DEBUG:asyncio:Get address info localhost:22, type=<SocketKind.SOCK_STREAM: 1>
DEBUG:asyncio:Getting address info localhost:22, type=<SocketKind.SOCK_STREAM: 1> took 1.120ms: [(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 22))]
INFO:asyncssh:Opening SSH connection to localhost, port 22
DEBUG:asyncio:Get address info localhost:22, type=<SocketKind.SOCK_STREAM: 1>
DEBUG:asyncio:Getting address info localhost:22, type=<SocketKind.SOCK_STREAM: 1> took 0.059ms: [(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 22))]
INFO:asyncssh:[conn=0] Connection to localhost, port 22 succeeded
INFO:asyncssh:[conn=0]   Local address: 127.0.0.1, port 55702
INFO:__main__:connection_made - blocking sleep for 3 seconds
WARNING:asyncio:Executing <Handle SSHConnection.connection_made(<_SelectorSoc...e, bufsize=0>>) created at /usr/lib/python3.8/asyncio/selector_events.py:756> took 3.007 seconds
DEBUG:asyncio:<asyncio.TransportSocket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 55702), raddr=('127.0.0.1', 22)> connected to localhost:22: (<_SelectorSocketTransport fd=6 read=polling write=<idle, bufsize=0>>, <asyncssh.connection.SSHClientConnection object at 0x7fa125bc1be0>)
INFO:asyncssh:[conn=1] Connection to localhost, port 22 succeeded
INFO:asyncssh:[conn=1]   Local address: 127.0.0.1, port 55704
INFO:__main__:connection_made - blocking sleep for 3 seconds
WARNING:asyncio:Executing <Handle SSHConnection.connection_made(<_SelectorSoc...e, bufsize=0>>) created at /usr/lib/python3.8/asyncio/selector_events.py:756> took 3.008 seconds
DEBUG:asyncssh:[conn=0] Requesting key exchange
DEBUG:asyncssh:[conn=0] Received key exchange request
DEBUG:asyncssh:[conn=0] Beginning key exchange
DEBUG:asyncio:<asyncio.TransportSocket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 55704), raddr=('127.0.0.1', 22)> connected to localhost:22: (<_SelectorSocketTransport fd=7 read=polling write=<idle, bufsize=0>>, <asyncssh.connection.SSHClientConnection object at 0x7fa125b4f5e0>)
DEBUG:asyncssh:[conn=1] Requesting key exchange
DEBUG:asyncssh:[conn=1] Received key exchange request
DEBUG:asyncssh:[conn=1] Beginning key exchange
DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>
jtougas commented 4 years ago

The second square bracket in the log is the correlation id, so connect lost occurs on conn=1

[2020-09-27 14:01:02,029][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][asyncssh][logging.py:79] Opening SSH connection to ps8adm1, port 22
[2020-09-27 14:01:02,029][03a876c1-9eec-4406-ad4b-ea5966004822][ThreadPoolExecutor-0_0][DEBUG][asyncio][base_events.py:805] Get address info ps8adm1:22, type=<SocketKind.SOCK_STREAM: 1>
[2020-09-27 14:01:02,076][03a876c1-9eec-4406-ad4b-ea5966004822][ThreadPoolExecutor-0_0][DEBUG][asyncio][base_events.py:815] Getting address info ps8adm1:22, type=<SocketKind.SOCK_STREAM: 1> took 46.263ms: [(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('172.20.10.30', 22))]
[2020-09-27 14:01:05,091][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][asyncssh][logging.py:79] [conn=1] Connection to ps8adm1, port 22 succeeded
[2020-09-27 14:01:05,091][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][asyncssh][logging.py:79] [conn=1]   Local address: 172.17.0.3, port 50292
[2020-09-27 14:01:05,092][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][__main__][test.py:18] connection_made - blocking sleep for 3 seconds
[2020-09-27 14:01:08,097][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][__main__][test.py:37] handle exception (<class 'asyncio.exceptions.CancelledError'>)
[2020-09-27 14:01:08,102][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][asyncssh][logging.py:79] [conn=1] Connection lost
[2020-09-27 14:01:08,103][03a876c1-9eec-4406-ad4b-ea5966004822][MainThread][INFO][__main__][test.py:22] connection_lost

Although if you're not able to reproduce this there's probably something else going on. I'll remove the rest of my code that still being run here (@with_correlation_context, @wrap_entry_point) and make sure I can still reproduce.

jtougas commented 4 years ago

Here's the whole thing in a docker container connection to its own ssh server. It doesn't reproduce as often, maybe 2 out of 10, but you should be able to get the same result this way.

https://github.com/jtougas/lost-connection

ronf commented 4 years ago

Thanks! It took several tries, but I was able to reproduce the problem here with the Docker container you provided.

The future object with the exception that's not being retrieved is the one created by the connect call:

source_traceback: Object created at (most recent call last):
  File "prog.py", line 177, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1851, in _run_once
    handle._run()
  File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "prog.py", line 49, in _decorator_parent_ctx
    return await _decorator_in_child_ctx()
  File "prog.py", line 45, in _decorator_in_child_ctx
    return await self._fn(*args, **kwargs)
  File "prog.py", line 144, in do_work
    async with asyncssh.connect(
  File "/usr/local/lib/python3.8/dist-packages/asyncssh/misc.py", line 206, in __aenter__
    self._result = await self._coro
  File "/usr/local/lib/python3.8/dist-packages/asyncssh/connection.py", line 6383, in connect
    return await _connect(options.host, options.port, loop, options.tunnel,
  File "/usr/local/lib/python3.8/dist-packages/asyncssh/connection.py", line 203, in _connect
    _, conn = await loop.create_connection(conn_factory, host, port,
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1050, in create_connection
    transport, protocol = await self._create_connection_transport(
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1068, in _create_connection_transport
    protocol = protocol_factory()
  File "/usr/local/lib/python3.8/dist-packages/asyncssh/connection.py", line 6374, in conn_factory
    return SSHClientConnection(loop, options, wait='auth')
  File "/usr/local/lib/python3.8/dist-packages/asyncssh/connection.py", line 2515, in __init__
    super().__init__(loop, options, acceptor, error_handler,
  File "/usr/local/lib/python3.8/dist-packages/asyncssh/connection.py", line 409, in __init__

Before this, I do see the handle_exception output:

[2020-09-30 02:37:32,897][857c95ad-97a7-437f-a7bf-f7af9c8f8838][MainThread][INFO][__main__][prog.py:154] handle exception (<class 'asyncio.exceptions.CancelledError'>)
[2020-09-30 02:37:32,897][70934ad6-40a1-493c-af97-f3b86b7baa81][MainThread][INFO][__main__][prog.py:154] handle exception (<class 'asyncio.exceptions.CancelledError'>)
[2020-09-30 02:37:32,898][70934ad6-40a1-493c-af97-f3b86b7baa81][MainThread][DEBUG][asyncssh][logging.py:79] [conn=0] Completed key exchange
[2020-09-30 02:37:32,899][70934ad6-40a1-493c-af97-f3b86b7baa81][MainThread][INFO][asyncssh][logging.py:79] [conn=0] Beginning auth for user root
[2020-09-30 02:37:32,899][857c95ad-97a7-437f-a7bf-f7af9c8f8838][MainThread][INFO][asyncssh][logging.py:79] [conn=1] Connection lost
[2020-09-30 02:37:32,899][857c95ad-97a7-437f-a7bf-f7af9c8f8838][MainThread][INFO][__main__][prog.py:139] connection_lost

This exception should be awaited in your calling code as part of the "async with" around the connect(), though.

Perhaps the problem here is that you are exiting out of your asyncio event loop too quickly, before it has a chance for the coroutine waiting on that Future to actually run. In my unit testing, I've seen that sort of thing happen in some cases when I get errors if I don't actually wait a little after the main thread errors out before closing the event loop. If you put in a short asyncio.sleep() at the end of main (after the try/except block), do you still see the problem?

ronf commented 4 years ago

Just tried it here, and unfortunately that didn't help.

ronf commented 4 years ago

Looking more closely at the traceback, I see that the traceback shows that the _connect() call is actually still in loop.create_connection() when the problem occurs, so it hasn't had a chance to get into the conn.wait_established() which would pick up the Future object. So, perhaps that's the problem here -- if you cancel the call while it's still waiting for the TCP connection to be established but that connection has actually BEEN established and it just hasn't gotten back to that calling code to see that, it might never pick up the exception from the other coroutine that's actually finishing the connect.

ronf commented 4 years ago

Unfortunately, if loop.create_connection() hasn't returned yet, AsyncSSH won't have a "conn" object in the _connect() code, so it won't have any access to the _waiter Future object.

Perhaps one thing that could help you here is to use the AsyncSSH login_timeout argument to put a timeout on the initial connection, rather than using your own wait_for(). Once the connection is established, you can use wait_for() if you like for other operations, but that should avoid canceling things in the middle of creating the SSHClientConnection.

jtougas commented 3 years ago

Yup that'll work. Thank you for taking time to work through this with me! :beers:

ronf commented 3 years ago

Happy to help - let me know if you have any other questions!

pocek commented 2 years ago

I think the conclusion here is not good enough. I've spent the whole day and a good part of night debugging (and patching) this and was going to make a new issue when I found this... :weary:

Below is a simple test case - a test server that immediately closes a connection and a simple client which does nothing wrong - it just cancels asyncssh.connect(s). Please see future-exception-was-never-retrieved for my take on actually fixing this issue. Without the patch I hit the Future exception was never retrieved every time, both on develop and v2.8.1. If you think it's PR worthy please let me know.

I hit this bug in a real world app which is careful about gracefully finishing all its tasks when interrupted (and avoiding hacks when possible). It's kind of easy to get bitten by this as it is now, really.

# Test server

import socket

def dummy_server():
    server_socket = socket.create_server(('', 4321), reuse_port=True)
    while True:
        conn, addr = server_socket.accept()
        conn.close()

dummy_server()
# Test client

import asyncio
import asyncssh
import logging

# Increase until asyncio logs "Future exception was never retrieved"
N_CONNS = 10

async def connect():
    await asyncssh.connect('localhost', 4321)

async def test():
    tasks = []

    for i in range(N_CONNS):
        task = asyncio.create_task(connect())
        tasks.append(task)

        # Let the loop spin
        await asyncio.sleep(0)

    for task in tasks:
        task.cancel()

    await asyncio.wait(tasks)

    for i, task in enumerate(tasks, 1):
        try:
            result = task.result()
        except BaseException as exc:
            print(f'{i} result: cancelled={task.cancelled()} exception={exc!r}')
        else:
            print(f'{i} result: cancelled={task.cancelled()} result={result}')

logging.basicConfig(level=logging.WARNING)
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.slow_callback_duration = 200
loop.run_until_complete(test())
loop.close()
ronf commented 2 years ago

Thank you for the report, and the test code. I am able to use that code to reproduce the issue here, which will definitely help!

I haven't had a chance to really dig into it, yet, but I should hopefully have some time in the next few days to do so.

I also took a look at your link above with a proposed fix. My one concern about that code is that you may be trading one race condition for another. If you don't create the future until just before the client does a wait on it but the ConnectionLost occurs before the client begins waiting, the client may block forever waiting for a future which is never filled in with a result. That was the reasoning behind creating it up front.

Could the issue be that the error is being returned by the call to loop.create_connection() in _connect(), rather than happening later when it is sitting in wait_established()? In that case, we'd never get back the conn object to wait on it, but I would generally expect any errors happening there to happen before the conn_factory is called, which would mean there wouldn't be a future created that needed to be waited on. In this case, we do seem to be getting far enough to create the future, but then I'd expect the client would always drop into wait_established() if a conn was returned, cleaning up any result set on that future.

ronf commented 2 years ago

Ok - I don't have a complete answer yet, but I think the main source of the problem here is your call to task.cancel(). It looks like what may be happening is that the cancel() is causing the loop.create_connection() to abort without giving it any opportunity to do any cleanup, but SOME of the time that call gets far enough to create an SSHClientConnection with the future associated with it. Since the calling task is cancelled before getting into wait_established(), you see this error about an exception never being retrieved, which is actually correct.

If possible it would be better here to not cancel the task, and just let it complete with the error, which you can then pick up in the calling code if you like. When you call cancel(), you can never be sure about how far a task got before that exception occurs, and in a case like this I can't even fix it by trying to catch the CancelledError, as I don't get back the return value of "conn" needed to do the cleanup.

pocek commented 2 years ago

I also took a look at your link above with a proposed fix. My one concern about that code is that you may be trading one race condition for another. If you don't create the future until just before the client does a wait on it but the ConnectionLost occurs before the client begins waiting, the client may block forever waiting for a future which is never filled in with a result. That was the reasoning behind creating it up front.

Good point! Could you please take a look at future-exception-was-never-retrieved-v2? I think it's fixed now. I also added some comments.

If possible it would be better here to not cancel the task, and just let it complete with the error, which you can then pick up in the calling code if you like.

I hear you, but this would mean I need to shield asyncssh.connect() from cancellation and wait for it to finish. If I have hundreds possible connects() in flight that can mean a lot of time - time a user would need to wait after he/she hits Ctrl+C. I stand on position that making cancellation more robust here is very much worth it.

Also, asyncssh does asyncio.wait_for() on _connect() internally, which means that there is possible cancellation inside asyncssh already - and that means it's possible to hit this issue from asyncssh, without any app-level cancellation.

ronf commented 2 years ago

Good point! Could you please take a look at future-exception-was-never-retrieved-v2? I think it's fixed now. I also added some comments.

While I think this may address the potential race I mentioned, I'd like to try and avoid the duplication of the functionality of the Future object in the handling of the FastWaitResult.

The main problem here is that the Future is never returned to the _connect() code when the task is cancelled while trying to make the outbound connection. I think this can be addressed by creating the Future inside the _connect() function and passing it into the SSHClientConnection (via options). This allows _connect() to have access to the Future even if the task gets cancelled before the conn object is returned. This would allow the _connect() code to explicitly cancel the waiter when its task is cancelled, so that it doesn't later report that an exception was never retrieved.

I've got a prototype of this that seems to work against your test program. Here it is, if you'd like to try it out:

@@ -391,50 +391,61 @@ async def _connect(options: 'SSHConnectionOptions',
     proxy_command = options.proxy_command
     free_conn = True

+    options.waiter = loop.create_future()
+
     new_tunnel = await _open_tunnel(tunnel, options.passphrase)
     tunnel: _TunnelConnectorProtocol

-    if new_tunnel:
-        new_tunnel.logger.info('%s %s via %s', msg, (host, port), tunnel)
+    try:
+        if new_tunnel:
+            new_tunnel.logger.info('%s %s via %s', msg, (host, port), tunnel)

-        # pylint: disable=broad-except
-        try:
-            _, tunnel_session = await new_tunnel.create_connection(
-                cast(SSHTCPSessionFactory[bytes], conn_factory), host, port)
-        except Exception:
-            new_tunnel.close()
-            await new_tunnel.wait_closed()
-            raise
-        else:
-            conn = cast(_Conn, tunnel_session)
-            conn.set_tunnel(new_tunnel)
-    elif tunnel:
-        tunnel_logger = getattr(tunnel, 'logger', logger)
-        tunnel_logger.info('%s %s via SSH tunnel', msg, (host, port))
+            # pylint: disable=broad-except
+            try:
+                _, tunnel_session = await new_tunnel.create_connection(
+                    cast(SSHTCPSessionFactory[bytes], conn_factory),
+                    host, port)
+            except Exception:
+                new_tunnel.close()
+                await new_tunnel.wait_closed()
+                raise
+            else:
+                conn = cast(_Conn, tunnel_session)
+                conn.set_tunnel(new_tunnel)
+        elif tunnel:
+            tunnel_logger = getattr(tunnel, 'logger', logger)
+            tunnel_logger.info('%s %s via SSH tunnel', msg, (host, port))

-        _, tunnel_session = await tunnel.create_connection(
-                cast(SSHTCPSessionFactory[bytes], conn_factory), host, port)
+            _, tunnel_session = await tunnel.create_connection(
+                cast(SSHTCPSessionFactory[bytes], conn_factory),
+                host, port)

-        conn = cast(_Conn, tunnel_session)
-    elif proxy_command:
-        conn = await _open_proxy(loop, proxy_command, conn_factory)
-    else:
-        logger.info('%s %s', msg, (host, port))
+            conn = cast(_Conn, tunnel_session)
+        elif proxy_command:
+            conn = await _open_proxy(loop, proxy_command, conn_factory)
+        else:
+            logger.info('%s %s', msg, (host, port))

-        _, session = await loop.create_connection(
-            conn_factory, host, port, family=family,
-            flags=flags, local_addr=local_addr)
+            _, session = await loop.create_connection(
+                conn_factory, host, port, family=family,
+                flags=flags, local_addr=local_addr)

-        conn = cast(_Conn, session)
+            conn = cast(_Conn, session)
+    except asyncio.CancelledError as exc:
+        options.waiter.cancel()
+        raise

     try:
-        await conn.wait_established()
+        await options.waiter
         free_conn = False

         if new_tunnel:
             conn.set_tunnel(new_tunnel)

         return conn
+    except asyncio.CancelledError as exc:
+        options.waiter.cancel()
+        raise
     finally:
         if free_conn:
             conn.abort()
@@ -721,7 +732,7 @@ class SSHConnection(SSHPacketHandler, asyncio.Protocol):
         self._error_handler = error_handler
         self._server = server
         self._wait = wait
-        self._waiter = loop.create_future()
+        self._waiter = options.waiter

         self._transport: Optional[asyncio.Transport] = None
         self._local_addr = ''

The bulk of this change is actually just an indent change to add an extra try/except. The rest is just adding the code to cancel the waiter when asyncio.CancelledError is raised and passing in the waiter to SSHConnection instead of creating it there.

Let me know what you think!

ronf commented 2 years ago

In further testing, I found a couple of issues in the change above, but the basic approach looks solid. The updated fix is now available in the "develop" branch as commit 5cea2ef.

Thanks very much for your help on this! Please let me know if this fix doesn't work for you.

pocek commented 2 years ago

Sorry for dropping silent on this. I cannot object against a cleaner solution. I hope I will be able to test it tomorrow (CET time :smiley:). Thank you!

pocek commented 2 years ago

Just for the record: it works fine now.

ronf commented 2 years ago

Great - thanks for the confirmation, and for putting together the test code! This change will be included in the next release.