ronf / asyncssh

AsyncSSH is a Python package which provides an asynchronous client and server implementation of the SSHv2 protocol on top of the Python asyncio framework.
Eclipse Public License 2.0
1.55k stars 152 forks source link

Spurious calls to `SSHChannel._cleanup` starting from AsyncSSH 2.15.0 #683

Closed GlassOfWhiskey closed 1 week ago

GlassOfWhiskey commented 2 months ago

The self._loop.call_soon(self._cleanup) in the _discard_recv() method of SSHChannel has been moved outside the if self._recv_state == 'close_pending' branch. This update causes the entire SSH connection to be invalidated when calling the close() method of the SSHChannel nd the _recv_state is set to 'open', i.e., when no close packet has been received. The proposed fix is to modify the close() method of the SSHChannel as follows:

    def close(self) -> None:
        ...

        if self._recv_state != 'closed':
            # Discard unreceived data
            self._recv_state = 'closed'
            self._discard_recv()

Probably, the same fix should be done in the abort() method of the same class.

ronf commented 2 months ago

This should already be fixed in the "develop" branch (in a different way) -- see the discussion in #678 and commit 3698c93, which was a follow-on to commits 220b9d4 and 676534b.

The real problem here was not with calling cleanup, but rather with AsyncSSH receiving a channel request from the remote peer (in this case an exit_status message) after calling close() or abort() on the channel. Incorrect handling of that is what led to it closing the entire connection.

GlassOfWhiskey commented 2 months ago

Hi, I just tried with the last commit in the develop branch and the problem is still present. However, with the last branch, neither the proposed fix works correctly. The problem still derives from calling self._loop.call_soon(self._cleanup) in the _discard_recv() method even when the channel is not closed. In particular, the problematic portion of the code is inside the _cleanup method itself:

if self._session is not None:
    self._session.connection_lost(exc)
    self._session = None

I think it should be changed to

if self._session is not None and self._recv_state == 'closed':
    self._session.connection_lost(exc)
    self._session = None

or something similar, in order to avoid spurious session terminations.

ronf commented 2 months ago

The original version of the code avoided calling _cleanup() when the channel wasn't fully closed in both directions, except in circumstances where the connection was lost and there was no chance of getting future messages from the peer to finish closing on the remaining open channels. Unfortunately, this caused a problem in the case of something like a timeout or KeyboardInterrupt where the local side of the channel was getting closed, but some remote servers didn't always reply with an immediate close() in response, particularly if they were in the middle of sending data (such as in an scp transfer). The end result was that the timeout/interrupt would basically hang waiting for the channel to close.

What I've attempted to do here is to allow _cleanup() to be called without waiting for the peer to send a close message. A regular application close will still wait for unsent data to all be written to the peer before the outbound close message is sent, but it won't wait for the peer to send a close message before triggering the cleanup.

What exception are you getting in your tests, and does it end up triggering the entire connection to be closed? Do you have sample code that might allow me to reproduce what you're seeing? I found and fixed one case where the early cleanup triggered an exception that led to the connection closing, but perhaps the one you're seeing is in a different place.

GlassOfWhiskey commented 2 months ago

Hi @ronf, this script reproduces the issue

import asyncio
import os.path
import shlex
import sys
import tarfile
import tempfile
from typing import Tuple

import asyncssh
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa

from streamflow.deployment import aiotarstream
from streamflow.deployment.stream import StreamWriterWrapper

async def destroy_server(container_id: str) -> None:
    proc = await asyncio.create_subprocess_exec(
        *shlex.split(f"docker stop {container_id}"),
        stdin=None,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()
    if proc.returncode == 0:
        return stdout.decode().strip()
    else:
        raise Exception(
            f"Error destroying Docker container: [{proc.returncode}] {stderr.decode().strip()}"
        )

def gen_keys() -> Tuple[str, str]:
    key = rsa.generate_private_key(
        backend=default_backend(), public_exponent=65537, key_size=4096
    )
    private_key = key.private_bytes(
        serialization.Encoding.PEM,
        serialization.PrivateFormat.PKCS8,
        serialization.NoEncryption(),
    )
    public_key = key.public_key().public_bytes(
        serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH
    )
    return public_key.decode("utf-8"), private_key.decode("utf-8")

async def run_client(private_key: str) -> None:
    async with asyncssh.connect(
        known_hosts=None,
        client_keys=[private_key],
        host="127.0.0.1",
        port=2222,
        username="linuxserver.io",
    ) as conn:
        with tempfile.NamedTemporaryFile(mode="w") as f:
            f.write("Example file")
            f.flush()
            f.seek(0)
            async with conn.create_process(
                f"tar xf - -C /config",
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.DEVNULL,
                encoding=None
            ) as proc:
                try:
                    stream = StreamWriterWrapper(proc.stdin)
                    async with aiotarstream.open(stream=stream, format=tarfile.GNU_FORMAT, mode="w") as tar:
                        await tar.add(f.name, arcname=os.path.basename(f.name))
                    print(f"Tarfile /config/{os.path.basename(f.name)} created")
                    await stream.close()
                except asyncssh.ProcessError as exc:
                    raise Exception(
                        f"Process exited with status {exc.exit_status}: {exc.stderr}"
                    )
            # This second connection fails with AsyncSSH >=2.15.0
            async with conn.create_process(
                    f"cat /config/{os.path.basename(f.name)}", stdin=asyncio.subprocess.DEVNULL
            ) as proc:
                try:
                    result = await proc.wait()
                    if proc.returncode != 0:
                        raise Exception(
                            f"Process exited with status {proc.returncode}: {result.stderr.strip()}"
                        )
                    else:
                        print(f"Tarfile /config/{os.path.basename(f.name)} created with content: {result.stdout.strip()}")
                except asyncssh.ProcessError as exc:
                    raise Exception(
                        f"Process exited with status {exc.exit_status}: {exc.stderr}"
                    )

async def run_server(public_key: str) -> str:
    proc = await asyncio.create_subprocess_exec(
        *shlex.split(
            f"docker run --rm --detach --interactive -p 2222:2222 -e PUBLIC_KEY='{public_key}' "
            "lscr.io/linuxserver/openssh-server"
        ),
        stdin=None,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()
    if proc.returncode == 0:
        await asyncio.sleep(3)
        return stdout.decode().strip()
    else:
        raise Exception(
            f"Error creating Docker container: [{proc.returncode}] {stderr.decode().strip()}"
        )

async def run() -> None:
    public_key, private_key = gen_keys()
    container_id = await run_server(public_key)
    try:
        with tempfile.NamedTemporaryFile("w") as f:
            f.write(private_key)
            f.flush()
            await run_client(f.name)
    finally:
        await destroy_server(container_id)

try:
    asyncio.run(run())
except (OSError, asyncssh.Error) as exc:
    sys.exit("SSH connection failed: " + str(exc))

You can install the latest version of streamflow using pip install streamflow==0.2.0.dev11 or install directly from the master branch at https://github.com/alpha-unito/streamflow.

ronf commented 2 months ago

Thank you for providing sample code, but there's still quite a lot here which has nothing to do with AsyncSSH. Would it be possible for you to trim away some of that code, particularly where it involves external dependencies like Docker and streamflow and reproduce without those?

Also, when you run this code, what output are you getting, and what kind of exception is raised, if any? If you enable debug logging, do you see any "Uncaught exception" messages?

GlassOfWhiskey commented 2 months ago

Hi @ronf, this is the updated script

import asyncio
import logging
import os.path
import shlex
import sys
import tempfile
from typing import Tuple

import asyncssh
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa

asyncssh.logging.logger.setLevel(logging.DEBUG)
asyncssh.logging.logger.set_debug_level(2)
asyncssh.logging.logger.logger.addHandler(logging.StreamHandler())

async def destroy_server(container_id: str) -> None:
    proc = await asyncio.create_subprocess_exec(
        *shlex.split(f"docker stop {container_id}"),
        stdin=None,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()
    if proc.returncode == 0:
        return stdout.decode().strip()
    else:
        raise Exception(
            f"Error destroying Docker container: [{proc.returncode}] {stderr.decode().strip()}"
        )

def gen_keys() -> Tuple[str, str]:
    key = rsa.generate_private_key(
        backend=default_backend(), public_exponent=65537, key_size=4096
    )
    private_key = key.private_bytes(
        serialization.Encoding.PEM,
        serialization.PrivateFormat.PKCS8,
        serialization.NoEncryption(),
    )
    public_key = key.public_key().public_bytes(
        serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH
    )
    return public_key.decode("utf-8"), private_key.decode("utf-8")

async def run_client(private_key: str) -> None:
    async with asyncssh.connect(
        known_hosts=None,
        client_keys=[private_key],
        host="127.0.0.1",
        port=2222,
        username="linuxserver.io",
    ) as conn:
        with tempfile.NamedTemporaryFile(mode="w") as f:
            f.write("Example file")
            f.flush()
            f.seek(0)
            async with conn.create_process(
                f"tar xf - -C /config",
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.DEVNULL,
                encoding=None
            ) as proc:
                try:
                    tar = await asyncio.create_subprocess_exec(
                        *shlex.split(
                            f"tar chf - -C {os.path.dirname(f.name)} {os.path.basename(f.name)}"
                        ),
                        stdin=None,
                        stdout=asyncio.subprocess.PIPE,
                        stderr=asyncio.subprocess.PIPE,
                    )
                    stdout, stderr = await tar.communicate()
                    if tar.returncode == 0:
                        proc.stdin.write(stdout)
                        print(f"Tarfile /config/{os.path.basename(f.name)} created")
                        # Without these two lines, AsyncSSH throws `AttributeError`
                        # With these lines, it throws 'SSH connection failed: SSH connection closed'
                        # Both configurations work with AsyncSSH < 2.15.0
                        proc.stdin.close()
                        await proc.stdin.wait_closed()
                    else:
                        raise Exception(
                            f"Error creating tar archive for {f.name}: [{tar.returncode}] {stderr.decode().strip()}"
                        )
                except asyncssh.ProcessError as exc:
                    raise Exception(
                        f"Process exited with status {exc.exit_status}: {exc.stderr}"
                    )
            # This second connection fails with AsyncSSH >=2.15.0
            async with conn.create_process(
                    f"cat /config/{os.path.basename(f.name)}", stdin=asyncio.subprocess.DEVNULL
            ) as proc:
                try:
                    result = await proc.wait()
                    if proc.returncode != 0:
                        raise Exception(
                            f"Process exited with status {proc.returncode}: {result.stderr.strip()}"
                        )
                    else:
                        print(f"Tarfile /config/{os.path.basename(f.name)} created with content: {result.stdout.strip()}")
                except asyncssh.ProcessError as exc:
                    raise Exception(
                        f"Process exited with status {exc.exit_status}: {exc.stderr}"
                    )

async def run_server(public_key: str) -> str:
    proc = await asyncio.create_subprocess_exec(
        *shlex.split(
            f"docker run --rm --detach --interactive -p 2222:2222 -e PUBLIC_KEY='{public_key}' "
            "lscr.io/linuxserver/openssh-server"
        ),
        stdin=None,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()
    if proc.returncode == 0:
        await asyncio.sleep(3)
        return stdout.decode().strip()
    else:
        raise Exception(
            f"Error creating Docker container: [{proc.returncode}] {stderr.decode().strip()}"
        )

async def run() -> None:
    public_key, private_key = gen_keys()
    container_id = await run_server(public_key)
    try:
        with tempfile.NamedTemporaryFile("w") as f:
            f.write(private_key)
            f.flush()
            await run_client(f.name)
    finally:
        await destroy_server(container_id)

try:
    asyncio.run(run())
except (OSError, asyncssh.Error) as exc:
    sys.exit("SSH connection failed: " + str(exc))

I removed all the StreamFlow dependencies. Docker is only used to provide a portable way to generate an SSH server, but it is not mandatory: the important part is inside the run_client function. Feel free to modify or disable the server logic to connect somewhere else.

I also enabled logs. The error I receive is:

[conn=0] Uncaught exception
Traceback (most recent call last):
  File "/home/glassofwhiskey/Projects/streamflow/streamflow/venv/lib/python3.12/site-packages/asyncssh/connection.py", line 1329, in data_received
    while self._inpbuf and self._recv_handler():
                           ^^^^^^^^^^^^^^^^^^^^
  File "/home/glassofwhiskey/Projects/streamflow/streamflow/venv/lib/python3.12/site-packages/asyncssh/connection.py", line 1594, in _recv_packet
    processed = handler.process_packet(pkttype, seq, packet)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/glassofwhiskey/Projects/streamflow/streamflow/venv/lib/python3.12/site-packages/asyncssh/packet.py", line 237, in process_packet
    self._packet_handlers[pkttype](self, pkttype, pktid, packet)
  File "/home/glassofwhiskey/Projects/streamflow/streamflow/venv/lib/python3.12/site-packages/asyncssh/channel.py", line 656, in _process_request
    self._service_next_request()
  File "/home/glassofwhiskey/Projects/streamflow/streamflow/venv/lib/python3.12/site-packages/asyncssh/channel.py", line 416, in _service_next_request
    result = cast(Optional[bool], handler(packet))
                                  ^^^^^^^^^^^^^^^
  File "/home/glassofwhiskey/Projects/streamflow/streamflow/venv/lib/python3.12/site-packages/asyncssh/channel.py", line 1246, in _process_exit_status_request
    self._session.exit_status_received(status)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'exit_status_received'

After some debugging I already did with StreamFlow, I discovered that this is due to a spurious call to the _cleanup function, as I mentioned in the PR. Indeed, the _cleanup function sets the _session attribute to None.

I also confirm that the script has the same behaviour with AsyncSSH v2.15.0, v2.16.0, and the master branch on GitHub.

MBradbury commented 2 months ago

Just wanted to add that I have also been seeing the same exception (specifically AttributeError: 'NoneType' object has no attribute 'exit_status_received') with 2.15.0. Rolling back to 2.14.0 has worked for me. I have not tested with the current develop.

In my experience the error has occurred when I have:

  1. Started a process with SSHClientConnection.create_process (this starts an ftp connection on the terminal)
  2. Before the created process would close I run another command on the same connection with SSHClientConnection.run (this tests if a file was retrieved by the ftp process)

I am able to run a number of commands using SSHClientConnection.run before encountering the exception.

ronf commented 2 months ago

Thanks, @GlassOfWhiskey.

I was able to run your script here, and confirmed I could reproduce the exception you mentioned on AsyncSSH 2.15.0 and 2.16.0 when I commented out the two lines mentioned in the comments. However, when I tried the exact same test on latest commit in the "develop" branch, the error no longer appears for me. Specifically, commit 3698c93 should prevent accessing the session object after it has been set to None.

Looking at the line numbers in the traceback above, AsyncSSH appears to be the version from 2.15.0 or 2.16.0, and not the latest version in "develop".

Could you try this again with the head of the "develop" branch?

@MBradbury, if you have a chance, could you try your test against the head of "develop" as well?

MBradbury commented 2 months ago

@ronf Hitting a slightly different exception with develop.

develop installed with:

python3 -m pip install -U --user git+https://github.com/ronf/asyncssh.git@develop

Traceback (some bits of my code cut out):

Traceback (most recent call last):
...
  File "", line 54, in test
    asyncio.get_event_loop().run_until_complete(
  File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
...
  File "", line 127, in ...
    result = await conn.run(command,
  File "/home/mbradbury/.local/lib/python3.10/site-packages/asyncssh/connection.py", line 4496, in run
    process = await self.create_process(*args, **kwargs) # type: ignore
  File "/home/mbradbury/.local/lib/python3.10/site-packages/asyncssh/connection.py", line 4374, in create_process
    chan, process = await self.create_session(
  File "/home/mbradbury/.local/lib/python3.10/site-packages/asyncssh/connection.py", line 4267, in create_session
    session = await chan.create(session_factory, command, subsystem,
  File "/home/mbradbury/.local/lib/python3.10/site-packages/asyncssh/channel.py", line 1115, in create
    packet = await self._open(b'session')
  File "/home/mbradbury/.local/lib/python3.10/site-packages/asyncssh/channel.py", line 714, in _open
    return await self._open_waiter
asyncssh.misc.ChannelOpenError: SSH connection closed

In this traceback the code looks like:

  1. Started a process for which the lifetime was managed with async with
  2. async with's scope ended
  3. Ran another command over ssh <-- exception here
ronf commented 2 months ago

Did you exit out of the "async with" that opened the connection before trying to run another command? If so, that's expected. You can leave the scope associated with a session (such as a create_process call), but you need to stay within the scope of the asyncssh.connect() for this to work.

GlassOfWhiskey commented 2 months ago

Hi @ronf, I can confirm that the commit you pointed out solves the errors. I don't know why my first attempt to use the develop branch failed, but now I checked again with a new Python env and the errors are solved. Thank you very much. Then I'll wait for the next release to update the StreamFlow dependency.

MBradbury commented 2 months ago

Did you exit out of the "async with" that opened the connection before trying to run another command? If so, that's expected. You can leave the scope associated with a session (such as a create_process call), but you need to stay within the scope of the asyncssh.connect() for this to work.

I had not called close on a connection at this point. (In my code I manually call conn = await asyncssh.connect() and conn.close() which get managed in an __aenter__ and __aexit__ of another class.)

Rough code structure

async with Tester() as tester: # opens conn, closes conn in __aexit__
    await tester.run("A")

    async with tester.conn.create_process("B") as proc:
        ...
        proc.terminate()

    await tester.run("C") # Exception here

Having written this up I was wondering if proc.terminate() was the issue, but I now cannot reproduce the exception using develop.

ronf commented 2 months ago

@GlassOfWhiskey, thanks for the confirmation!

@MBradbury, that code structure looks ok. Regarding the "SSH connection closed", I do remember seeing that prior to the fix in commit 3698c93. It wasn't as common as the exit_status_received exception, but it did happen on occasion, as there was a race between a new session starting up and an old one getting cleaned up while there was still a message (exit status) in transit. Both of these errors should be fixed in this commit, though.

The proc.terminate() call shouldn't be an issue -- that just sends a TERM signal to a specific session, so it shouldn't have an impact on other sessions or the ability to open new sessions on the connection.

If you're able to reproduce this again with the latest "develop", please let me know. In the meantime, though, I'll look into getting the current fix released, since this is a regression.

ronf commented 2 months ago

AsyncSSH 2.17.0 is now available with this fix and some other minor bug fixes and improvements.

ronf commented 1 month ago

After additional testing, I found that even with the fix in 3698c93 I was still seeing problems in some cases. So, I backed out both of these changes and found a different way to address the issue in SCP that prompted this change. The next release will go back to the original approach at handling connection closes, returning to the behavior in AsyncSSH 2.14.0.

GlassOfWhiskey commented 1 week ago

Hi @ronf, do you know when this new version will be released? We are experiencing some issues with the last AsyncSSH versions, too.

ronf commented 1 week ago

I'm just waiting for the dust to settle on a few different fixes, to make sure they all look ok. If nothing new comes up, I should be able to put together a new release this weekend.

ronf commented 1 week ago

This fix is now available in AsyncSSH 2.18.0.