ParallelSSH / parallel-ssh

Asynchronous parallel SSH client library.
https://parallel-ssh.org
GNU Lesser General Public License v2.1
1.2k stars 148 forks source link

How to work with long-running commands? #348

Closed asomers closed 2 years ago

asomers commented 2 years ago

Background

I'm trying to execute a long-running command on a remote server. It needs to be fed data through stdin, and it may occasionally print stuff through stdout and stderr. It will not terminate until it gets EOF on its input. I have a working program that uses Paramiko. It multiplexes over the SSH channel and another file descriptor with select, and reads from both sources in non-blocking mode. However, my attempt at using parallel-ssh exhibits a few bugs.

Describe the bug

  1. The output is duplicated. The program is supposed to print the numbers 0...1000 . But it usually prints something like 0..21 and then 0..21 again. Is there a better way to read partial output than for line in channel.stdout?
  2. Reading partial output hangs. Typically the program freezes after printing a few dozen numbers. It freezes while trying to read from channel.stdout. How can I do that in a nonblocking way?
  3. The program never terminates. I can workaround problem 2 by changing the range of numbers sent from 0...1000 to 0...10. If I do that then it prints them correctly and also prints the debug message "Sending EoF", but then it appears to hang. What happens is that select always returns that the channel is readable, but exit_code is always None so the program busy loops. Why doesn't exit_code get set?

To Reproduce

Execute this program like this: python3.9 cat-parallelssh.py my-host. To see problem 3, change the range arguments to 0, 10.

from concurrent.futures import ThreadPoolExecutor
import os
import select
import sys

from pssh.clients import SSHClient
from pssh.exceptions import Timeout
from ssh2 import error_codes

class Remote:
    def __init__(self, the_host):
        self.host = the_host
        self.client = SSHClient(self.host)

    def cat(self):
        """ Start a cat process and return the paramiko channel """
        cmd = "cat"
        channel = self.client.run_command(cmd, use_pty=True)
        return channel

class RemoteMigrator:
    def __init__(self, host):
        self.host = host
        self.remote = Remote(host)

    def migrate(self):
        channel = self.remote.cat()
        (pin, pout) = os.pipe()
        with ThreadPoolExecutor() as executor:
            fut = executor.submit(self._th_send, pin)
            status = self._feed_pipes(pout, channel)
            fut.result()
        self.sendpipe = None
        if status != 0:
            raise Exception(f"Remote: status={status}")

    def _th_send(threadname, sendpipe):
        # This must be in its own thread, because the original version invokes
        # a blocking C function to write to sendpipe.
        try:
            for i in range(0, 1000):
                os.write(sendpipe, (f"{i}\n".encode()))
        finally:
            os.close(sendpipe)

    def _feed_pipes(self, sendpipe, channel):
        BS = 128
        readfds = [channel.channel.session.sock.fileno(), sendpipe]
        try:
            while True:
                rl, wl, xl = select.select(readfds, [], [], 1)
                for readable in rl:
                    if readable == sendpipe:
                        data = os.read(sendpipe, BS)
                        try:
                            channel.stdin.write(data)
                            channel.stdin.flush()
                        except OSError as e:
                            sys.stderr.write("Error: remote terminated early: %s\n" % e)
                            break
                        if len(data) == 0:
                            # EOF indicates the send is done
                            print("Sending EoF")
                            channel.channel.send_eof()
                            readfds = [channel.channel.session.sock.fileno()]
                            break
                    else:
                        try:
                            for line in channel.stdout: # XXX Sometimes hangs
                                print(line)
                        except Timeout:
                            print("stdout timeout")
                        try:
                            for line in channel.stderr:
                                print(line, file=sys.stderr)
                        except Timeout:
                            print("stderr timeout")
                if len(rl) == 0:
                    print("select timeout")
                if channel.exit_code:
                    break
        finally:
            os.close(sendpipe)
        status = channel.exit_code
        channel.close()
        return status

migrator = RemoteMigrator(sys.argv[1])
migrator.migrate()

Expected behavior It should print the numbers 0 through 999, inclusive, each on its own line and then terminate. For comparison, this paramiko program does just that:

from concurrent.futures import ThreadPoolExecutor
import os
import select
import sys

import paramiko

class Remote:
    def __init__(self, the_host):
        self.host = the_host
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.load_system_host_keys()
        self.client.connect(self.host)

    def cat(self):
        """ Start a cat process and return the paramiko channel """
        cmd = "cat"
        channel = self.client.get_transport().open_session()
        channel.exec_command(cmd)
        return channel

class RemoteMigrator:
    def __init__(self, host):
        self.host = host
        self.remote = Remote(host)

    def migrate(self):
        channel = self.remote.cat()
        (pin, pout) = os.pipe()
        with ThreadPoolExecutor() as executor:
            fut = executor.submit(self._th_send, pin)
            status = self._feed_pipes(pout, channel)
            fut.result()
        self.sendpipe = None
        if status != 0:
            raise Exception(f"Remote: status={status}")

    def _th_send(threadname, sendpipe):
        # This must be in its own thread, because the original version invokes
        # a blocking C function to write to sendpipe.
        try:
            for i in range(0, 1000):
                os.write(sendpipe, (f"{i}\n".encode()))
        finally:
            os.close(sendpipe)

    def _feed_pipes(self, sendpipe, channel):
        BS = 128
        readfds = [channel, sendpipe]
        try:
            while True:
                rl, wl, xl = select.select(readfds, [], [])
                for readable in rl:
                    if readable == sendpipe:
                        data = os.read(sendpipe, BS)
                        if len(data) == 0:
                            # EOF indicates the send is done
                            channel.shutdown_write()
                            readfds = [channel]
                            break
                        try:
                            channel.sendall(data)
                        except OSError as e:
                            sys.stderr.write("Error: remote terminated early: %s\n" % e)
                            break
                    if readable == channel:
                        if channel.recv_ready():
                            sys.stdout.write(channel.recv(BS).decode())
                        if channel.recv_stderr_ready():
                            sys.stderr.write(channel.recv_stderr(BS).decode())
                if channel.exit_status_ready():
                    break
            sys.stdout.write(channel.recv(BS).decode())
            sys.stderr.write(channel.recv_stderr(BS).decode())
        finally:
            os.close(sendpipe)
        status = channel.recv_exit_status()
        channel.close()
        return status

migrator = RemoteMigrator(sys.argv[1])
migrator.migrate()

Actual behaviour It typically prints the numbers 0 through 21 twice and then hangs. The exact numbers printed varies from run to run.

Additional information ssh2_python-0.27.0

pkittenis commented 2 years ago

Hi there,

Thanks for the interest.

Reading partial output from commands that do not terminate is described in the documentation.

Using the example as in the documentation, the behaviour is as expected:

output = client.run_command(
    'while true; do echo a line; sleep .1; done',
    use_pty=True, read_timeout=1)

# Read as many lines of output as hosts have sent before the timeout
stdout = []
for host_out in output:
    try:
        for line in host_out.stdout:
            stdout.append(line)
    except Timeout:
        pass

# Closing channel which has PTY has the effect of terminating
# any running processes started on that channel.
for host_out in output:
    host_out.client.close_channel(host_out.channel)
# Join is not strictly needed here as channel has already been closed and
# command has finished, but is safe to use regardless.
client.join(output)
# Can now read output up to when the channel was closed without blocking.
rest_of_stdout = list(output[0].stdout)

The documentation describes how to close the remote command. No thread pools should be used - channels cannot be shared (safely) across threads with parallel-ssh.

The duplicated output is because of the use of threads - there are multiple channels using the same socket across threads, both of them will have output.

Can re-open if there is an issue with documented example as shown above.

Use the high level APIs in clients for writing to stdin, not OS pipes. They will not work.

asomers commented 2 years ago

What's wrong with my use of pipes? I'm not actually passing a pipe to parallel-ssh. I'm reading from the pipe and then writing with channel.stdin.write. Is there a problem with that? For my application, I must use pipes because the library that produces the data sends it to a file descriptor.

I don't think I can completely eliminate threads. That same library requires a thread. But that thread (_th_send) doesn't access the parallel-ssh channel. So why is that a problem?

Finally, while I did see the stuff about read_timeout in the docs, that approach won't work for me. I expect much more activity on stdin than on stdout. If I have to wait for a 1 second timeout on every loop iteration that would be too slow. That's why I use select to multiplex the file descriptors. Is there any way to use stdout in a truly non-blocking mode?

pkittenis commented 2 years ago

Is there any way to use stdout in a truly non-blocking mode?

Sure, have a look at the poll command.

If you're not passing data to/from an SSH channel with pipes, that's fine. Pipes are inherently blocking, so you can't use them for network communication of SSH traffic. Reading/writing from them inside a shell is fine, that's standard shell output.

The problematic code, as far as I can see without minimal code to reproduce that can be run anywhere, is this block here:

        with ThreadPoolExecutor() as executor:
            fut = executor.submit(self._th_send, pin)
            status = self._feed_pipes(pout, channel)

channels cannot be used in a thread pool in this way. Things will break. Can call executor.submit, exit out of the thread pool and then do the SSH channel operations, that would be fine. Just don't share channels/client objects across threads. Though this may not be related if an executor is not actually using the channel.

If a standalone piece of code that I can run locally can be shown I can take a look. Otherwise just guessing from looking at code.

The other thing is you need gevent.select not standard python select. All low level networking imports must come from gevent if you want low level functionality. But that's what SSHClient.poll does, just use that.

Also these:

                        try:
                            channel.stdin.write(data)
                            channel.stdin.flush()

These are blocking calls. You need HostOutput.stdin.write as returned by run_command.

It sounds like a combination of a low read_timeout and a SSHClient.poll before writing will do what you want. But again, just guessing.

asomers commented 2 years ago

What's wrong with the example I posted above? Can you not run that locally?

pkittenis commented 2 years ago

Can tell it doesn't work from all the blocking calls - select, channel.write etc. Once those are changed, does it work? The issue tracker is for actionable bugs, not instructions how to use.