ronf / asyncssh

AsyncSSH is a Python package which provides an asynchronous client and server implementation of the SSHv2 protocol on top of the Python asyncio framework.
Eclipse Public License 2.0
1.56k stars 156 forks source link

Intermittent failure to read output when using SSHClientProcess.wait #669

Closed ethanbb closed 4 months ago

ethanbb commented 4 months ago

asyncssh version: 2.14.1

I'm having an issue that may just be user error, as I'm new to asyncio - hoping I can get some guidance.

I'm trying to start an SSH process and then wrap a coroutine that waits for it to complete in a Task. My problem is that >50% of the time, I am reading an empty string instead of the output I expect from the SSHCompletedProcess object. This is a command that 100% always returns a line of output when run on the host (and I do get the output I expect about 30-40% of the time).

Here is my waiting coroutine:

async def wait_for_completion(conn: asyncssh.SSHClientConnection, proc: asyncssh.SSHClientProcess, timeout: Optional[float] = None,
                              check=False, read_output=False, expect_output=False) -> asyncssh.SSHCompletedProcess:
    """
    Coroutine that monitors a process for having exited and closes it if it runs beyond timeout seconds.
    If read_output is True, stdout will be consumed and printed to the terminal. Then it cannot be read from the returned object.
    """
    start_time = time.time()
    try:
        result = None
        while True:
            if read_output:  # this is always false, will probably remove
                print(proc.collect_output())
            try:
                result = await proc.wait(check=check, timeout=0.2)
                while not proc.stdout.at_eof():
                    await asyncio.sleep(0)
                break
            except asyncssh.TimeoutError:
                if timeout is None or time.time() - start_time < timeout:
                    continue
                else:
                    raise

        # I added this as a desperate attempt, but it didn't help
        # hack to maybe fix race condition?
        assert result is not None, 'Only way to exit is for result to be assigned'
        if expect_output and not read_output:
            wait_time = 5
            start_loop = time.time()
            while not result.stdout and time.time() - start_loop < wait_time:
                await asyncio.sleep(0.2)
            if not result.stdout:
                # print('SSH command: ', result.command)
                print('Result: ', result)
                print()
                raise RuntimeError(f'Output was not assigned in {wait_time} seconds')
        return result
    finally:
        if read_output:
            print(proc.collect_output())
        proc.close()
        conn.close()

Here is how I'm calling it:

# in start_command_on_host (note read_output/print_output is always false, will probably remove)
conn = await asyncssh.connect(host.name, username=host.user if host.user is not None else ())
proc = await conn.create_process(command, term_type='xterm-color', stderr=asyncssh.STDOUT)
return asyncio.create_task(wait_for_completion(conn, proc, timeout=timeout, check=check,
                                               read_output=print_output, expect_output=expect_output))
...
# in another coroutine
task = await start_command_on_host(command, host=host, print_output=False, timeout=timeout, check=True,
                                       use_srun=use_slurm, sbatch_opts=slurm_args, expect_output=expect_nonempty)
res = await task
output = get_string_output(res)

And this is the output I'm getting when it fails:

Result:  env: {}, command: bash -ic 'conda run -n mescore python -c '"'"'import cmcode; print(next(iter(cmcode.__path__)))'"'"'', subsystem: None, exit_status: 0, exit_signal: None, returncode: 0, stdout: , stderr:

Traceback (most recent call last):
  ... clipped ...
  File "\\proektdata\bigdata\eblackwood\2p_analysis\cmcode\caimanlab.py", line 29, in get_path_from_host
    output = await remoteops.get_output_from_command(command, host, timeout=timeout, expect_nonempty=True)
  File "\\proektdata\bigdata\eblackwood\2p_analysis\cmcode\remoteops.py", line 286, in get_output_from_command
    res = await task
  File "\\proektdata\bigdata\eblackwood\2p_analysis\cmcode\remoteops.py", line 202, in wait_for_completion
    raise RuntimeError(f'Output was not assigned in {wait_time} seconds')
RuntimeError: Output was not assigned in 5 seconds

Expected output (when running on remote host):

(base) ethan@gpu01:~$ bash -ic "conda run -n mescore python -c 'import cmcode; print(next(iter(cmcode.__path__)))'"
/mnt/bigdata/eblackwood/2p_analysis/cmcode

(base) ethan@gpu01:~$ bash -ic "conda run -n mescore python -c 'import cmcode; print(next(iter(cmcode.__path__)))'"
/mnt/bigdata/eblackwood/2p_analysis/cmcode

(base) ethan@gpu01:~$ bash -ic "conda run -n mescore python -c 'import cmcode; print(next(iter(cmcode.__path__)))'"
/mnt/bigdata/eblackwood/2p_analysis/cmcode

etc.

Anything I'm doing clearly wrong here, or could there be a bug?

ronf commented 4 months ago

I haven't actually tested the code above, but at first glance here are some thoughts:

If not writing EOF turns out to be the problem, I can see an argument for doing this automatically, or maybe distinguishing between None and an empty string in the input argument here to decide whether to call write_eof() or not. Callers passing in None (or not setting input) would get the current behavior, but callers passing in an empty string would send EOF on the channel without providing any input before waiting for output.

As an aside, you don't really need the stderr=asyncssh.STDOUT to combine stdout & stderr together in the case where you are setting a term_type, as that happens automatically when a PTY is requested, and that'll happen by default when you set a terminal type.

ronf commented 4 months ago

As for where to send requests like this, you might want to consider using the 'Discussions' section if you have usage questions, rather than 'Issues'. Sending to the asyncssh-users mailing list is also fine (as you did after creating this), but putting stuff in 'Discussions' rather than the mailing list will make it easier for other users to find your question in a search.

ethanbb commented 4 months ago

Thanks for the suggestions!

Doing an asyncio.sleep(0) in the first loop is probably not a good idea, as you'll basically end up eating 100% CPU in that task while that loop runs.

Noted, thanks. I saw in the asyncio docs that they actually suggest using 0 for long-running tasks (maybe I misunderstood?) but I have nothing against using a few tenths of a second instead.

You don't seem to be closing stdin on the process before calling proc.wait(). Whether this is a problem or not depends on the command you are running, but there are commands where stdout won't be closed until stdin is closed and so you may see it waiting until you hit the timeout without proc.stdout.at_eof() becoming True. If your command doesn't need any input, you might want to call proc.stdin.write_eof() before you call proc.wait().

Will try this, thanks.

As for the first loop, this should be unnecessary unless the amount of output expected is very large. You should be able to do a single call to proc.wait() with the full timeout. It should return as soon as stdout/stderr is closed with all of the collected output in result.stdout, or raise TimeoutError if that doesn't happen within the requested time. Basically, it should be possible to replace this entire block of code with a single call to proc.wait() with the full timeout (after writing EOF to stdin). You'd then get everything you need out of the result returned, or you'd get a timeout exception.

In the end, unless the output is very large, this whole thing (including the call to create_process()) can probably be replaced with a call to conn.run().

I wrote it this way so that output could be printed incrementally (originally I had an interactive program that asks for a password in mind) and I didn't want to try hooking sys.stdout up to the output stream because I didn't fully understand the warning about flushing buffers. Since I'm not planning to need that anymore, you're totally right, I could just create a task out of conn.run() instead - I will give it a try!

ronf commented 4 months ago

I saw in the asyncio docs that they actually suggest using 0 for long-running tasks (maybe I misunderstood?) but I have nothing against using a few tenths of a second instead.

Using a zero there can be good if you just want to give other asyncio tasks a chance to run until their next blocking point and then get right back to your long-running task. However, that assumes you're making incremental progress in the work your task needs to do each time it gets scheduled. In this case, if EOF hasn't been received, you could end up in an infinite loop there, making no progress each time you're rescheduled.

I wrote it this way so that output could be printed incrementally (originally I had an interactive program that asks for a password in mind) and I didn't want to try hooking sys.stdout up to the output stream because I didn't fully understand the warning about flushing buffers. Since I'm not planning to need that anymore, you're totally right, I could just create a task out of conn.run() instead - I will give it a try!

If you want to use conn.run() and you need to make sure stdin is closed from the start, you might want to use stdin=asyncsh.DEVNULL as one of the arguments. That'll prevent you from having to manually call write_eof() on the channel, which would only be possible if you had access to the process (or session), which would mean you'd have to keep the create_process() call separate. By using DEVNULL, though, you can do it all from conn.run().

Regarding incremental output, that would be a good use case for a shorter timeout in a loop. Normally, this would be done by calling something like proc.stdout.read(8192) wrapped in an asyncio.wait_for()., though it should also work to use proc.wait() with a timeout and then proc.collect_output() as you did in your first loop. Note that you'd wan to add end='' to the print() call around that, and that collect_output returns a tuple of stdout and stderr output, so you'll need to add something like a [0] on the value returned there. Doing incremental output like this may be necessary if the output is large enough to not all fit in the allowed receive buffer size, but that's only an issue if your output is megabytes in size.

ethanbb commented 4 months ago

Thanks again for all the tips. I was in a rush to get something working quickly, and clearly this is an area with some surprising complexity, but I managed to get there thanks to your freely offered expertise.

ronf commented 4 months ago

Happy to help - glad to hear you got it working!