ParallelSSH / ssh2-python

Python bindings for libssh2 C library.
https://parallel-ssh.org
GNU Lesser General Public License v2.1
228 stars 70 forks source link

`SocketRecvError` on resuming upload of partially uploaded files #182

Closed NamamiShanker closed 1 year ago

NamamiShanker commented 1 year ago

Bug reports

I am writing a program that will upload files to a CrushFTP server through SFTP and retry on connection loss. Below is my Connection class for wrapping ssh2-python library, and my retry logic to resume upload on connection loss.

Steps to reproduce:

  1. Example code that produces error. The Connection class that wraps ssh2-python

    class Connection:
    
    def __init__(self, host: str, username: str = None, password: str = None, port: int = 22, retries: int = 5):
        self.host = host
        self.username = username
        self.password = password
        self.port = port
        logger.info(f"Attempting to connect to {host}")
        self.connected = False
        while not self.connected and retries:
            try:
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.connect((host, port))
                self.connected = True
            except Exception as e:
                if retries:
                    logger.error(f"SFTP Conncetion failed : Retrying : Retries left {retries}")
                    time.sleep(5)
                    retries -= 1
                else:
                    logger.exception("Unable to establish connection to the host")
                    raise e
        logger.info(f"Connected to {host}")
        self.session = Session()
        self.session.set_timeout(5000)
        self.session.handshake(self.sock)
        self.session.userauth_password(username, password)
        self.sftp = self.session.sftp_init()
        logger.info("Opened sftp connection")
        self.mode = LIBSSH2_SFTP_S_IRUSR | LIBSSH2_SFTP_S_IWUSR | LIBSSH2_SFTP_S_IRGRP | LIBSSH2_SFTP_S_IROTH
    
        self.f_flags_append = LIBSSH2_FXF_APPEND
        self.f_flags_write = LIBSSH2_FXF_WRITE
        self.f_flags_read = LIBSSH2_FXF_READ
    
    def reconnect(self):
        self.close()
        self.__init__(self.host, self.username, self.password, self.port)
        return self
    
    def put(self, localpath: Path, remotepath: str, update_callback=None, set_callback=None):
        if not localpath.is_file():
            logger.exception(f"{localpath} is not a file")
            return
        logger.info(f"Uploading {localpath} to {remotepath}")
    
        try:
            with self.sftp.open(remotepath, self.f_flags_read, self.mode) as remote_fh:
                if localpath.stat().st_size == remote_fh.fstat().filesize:
                    logger.info(f"{localpath} already exists on {remotepath}")
                    return remote_fh.fstat()
        except ssh2.exceptions.SFTPProtocolError as e:
            with self.sftp.open(remotepath, self.f_flags_write, self.mode) as remote_fh:
                pass
            logger.info(f"Creating remote file {remotepath}")
        with open(localpath, 'rb', common.BUFSIZE) as local_fh, self.sftp.open(remotepath, self.f_flags_append, self.mode) as remote_fh:
            remote_size = remote_fh.fstat().filesize
            if(remote_size > 0):
                logger.info(f"File {localpath} partially exists at {remotepath}: Existing file size {remote_size} bytes : Remaining file size {localpath.stat().st_size - remote_size} bytes")
                if(set_callback):
                    set_callback(remote_size, localpath.stat().st_size)
            logger.info(f"Seek pointer: {remote_fh.tell64()}")
            remote_fh.seek64(remote_size)
            local_fh.seek(remote_size)
            while data := local_fh.read(common.BUFSIZE):
                remote_fh.write(data)
                if update_callback:
                    update_callback(data.__sizeof__(), localpath.stat().st_size)
        return remote_fh.fstat()
    
    def close(self):
        logger.info("Closing connection")
        self.session.disconnect()
        self.sock.close()
        logger.info("Connection closed")

The retry logic, where functions _show_updates and _set_update are just tqdm updation callbacks:

def _transfer_file(self, file: Path, update_callback=_show_updates, set_callback=_set_update) -> int:
    """Transfer a single file and return the number of bytes transferred"""
    self.current_file = file
    now = datetime.now()
    while True:
        try:
            transferred_file = self.client.put(file, self.remote_folder_path + file.name, update_callback=update_callback, set_callback=set_callback)

        except exceptions.Timeout as e:
            logger.error("Connection lost : Retrying")
            self.client = self.client.reconnect()
        except exceptions.SocketSendError as e:
            logger.error(f"Connection lost : Retrying in {self.retry_time_secs} seconds")
            time.sleep(self.retry_time_secs)
            self.client = self.client.reconnect()
        except exceptions.SocketRecvError as e:
            logger.error(f"SocketRecvError: Connection lost : Retrying in {self.retry_time_secs} seconds")
            time.sleep(self.retry_time_secs)
            self.client = self.client.reconnect()
        except Exception as e:
            logger.error(f"Error transferring file {file.name}")
            raise e
        else:
            logging.info(f"File {file.resolve()} : Size {transferred_file.filesize} bytes : Started {now} : Finished {datetime.now()}")

            return transferred_file.filesize
  1. Stack trace or error messages.
    INFO 2022-09-05 16:11:33,009 - Attempting to connect to 192.168.1.79
    INFO 2022-09-05 16:11:33,009 - Attempting to connect to 192.168.1.79
    INFO 2022-09-05 16:11:33,010 - Connected to 192.168.1.79
    INFO 2022-09-05 16:11:33,099 - Opened sftp connection
    INFO 2022-09-05 16:11:33,099 - Connected to 192.168.1.79
    INFO 2022-09-05 16:11:33,112 - Uploading testfiles/archive.part01.rar to /archive.part01.rar                                                                                                                                                                                                                       
    INFO 2022-09-05 16:11:33,122 - File testfiles/archive.part01.rar partially exists at /archive.part01.rar: Existing file size 9175040 bytes : Remaining file size 95682560 bytes                                                                                                                                    
    INFO 2022-09-05 16:11:33,123 - Seek pointer: 0                                                                                                                                                                                                                                                                     
    24%|██████████████████████████████████████████████████████████████▏                                                                                                                                                                                                       | 24.9M/105M [00:09<00:49, 1.63Mbytes/s]
    24%|██████████████████████████████████████████████████████████████▏                                                                                                                                                                                                       | 24.9M/105M [00:09<00:31, 2.56Mbytes/s]
    Traceback (most recent call last):
    File "/home/namami/dev/ftp/ft_manager/sftp_manager/ssh2_sftp.py", line 81, in put
    remote_fh.write(data)
    File "ssh2/sftp_handle.pyx", line 292, in ssh2.sftp_handle.SFTPHandle.write
    File "ssh2/utils.pyx", line 148, in ssh2.utils.handle_error_codes
    ssh2.exceptions.Timeout

    Expected behaviour: The script should have resumed upload of the partially uploaded file.

Actual behaviour: I get a SocketRecvError.

Additional info: libssh2 v1.0.0:

When I run the script again, the uploading resumes without any errors. How should I handle the different errors so that the uploading of files can be resumed.

pkittenis commented 1 year ago

Hi there,

Thanks for the interest.

Not seeing an issue with library here. Timeout is set at self.session.set_timeout(5000). This causes file transfers to timeout after 5sec. The library correctly raises ssh2.exceptions.Timeout after 5sec of transferring.

If timeouts are not wanted, set_timeout should not be called.

To be closed unless an issue with library is shown.

pkittenis commented 1 year ago

Not seeing an issue. Timeout raised correctly after set_timeout is used.