AspectUnk / russh-sftp

SFTP subsystem supported server and client for Russh
Apache License 2.0
62 stars 21 forks source link

File transfer example #10

Open rukai opened 11 months ago

rukai commented 11 months ago

It would be nice to have an example for how to best transfer a file to/from the remote machine. e.g. a function like push_file(source_path: &str, dest_path: &str) that pushes from the local machine to the remote machine.

Doing this efficiently would be complicated since I imagine we would want to read new bytes from the source while sending to the destination? Additionally this is a very common use case for sftp, so I think making an example of this would bring a lot of value.

rukai commented 11 months ago

I'm trying to replace a system where I pipe stdout to/from dd over ssh to transfer files. It takes about 2-3s to transfer a 1MB file that way.

Trying a naive implementation of

    pub async fn pull_file(&self, source: &Path, dest: &Path) {
        let task = format!("pulling file from {}:{source:?} to {dest:?}", self.address);
        tracing::info!("{task}");

        let mut channel = self.session.channel_open_session().await.unwrap();
        channel.request_subsystem(true, "sftp").await.unwrap();
        let sftp = SftpSession::new(channel.into_stream()).await.unwrap();
        let mut file = sftp.open(source.to_str().unwrap()).await.unwrap();

        let mut dest = File::create(dest)
            .await
            .map_err(|e| anyhow!(e).context(format!("Failed to read from {source:?}")))
            .unwrap();
        let mut read = vec![];
        file.read_to_end(&mut read)
            .await
            .unwrap_or_else(|e| panic!("{task} failed to read from local disk with {e:?}"));
        tokio::io::AsyncWriteExt::write_all(&mut dest, &read)
            .await
            .unwrap_or_else(|e| panic!("{task} failed to write to remote server with {e:?}"));
    }

or

    pub async fn pull_file(&self, source: &Path, dest: &Path) {
        let task = format!("pulling file from {}:{source:?} to {dest:?}", self.address);
        tracing::info!("{task}");

        let mut channel = self.session.channel_open_session().await.unwrap();
        channel.request_subsystem(true, "sftp").await.unwrap();
        let sftp = SftpSession::new(channel.into_stream()).await.unwrap();
        let mut file = sftp.open(source.to_str().unwrap()).await.unwrap();

        let mut dest = File::create(dest)
            .await
            .map_err(|e| anyhow!(e).context(format!("Failed to read from {source:?}")))
            .unwrap();

        let mut bytes = vec![0u8; 1024 * 1024];
        loop {
            let read_count = file
                .read(&mut bytes[..])
                .await
                .unwrap_or_else(|e| panic!("{task} failed to read from local disk with {e:?}"));
            if read_count == 0 {
                break;
            }
            tokio::io::AsyncWriteExt::write_all(&mut dest, &bytes[0..read_count])
                .await
                .unwrap_or_else(|e| panic!("{task} failed to write to remote server with {e:?}"));
        }
    }

is slower than my original dd approach, taking about 5-10 seconds. I tried a 4KB buffer instead of a 1MB buffer and that took 60s.

UVUUUVUU commented 10 months ago

Hi! Have you tried transferring multiple files? The struct SftpSession doesn't implement clone trait. how can we do this?

let sftp = sftp.clone();
let handle = task::spawn(async move {
    download_file(sftp, remote_path, local_path).await;
}
AspectUnk commented 10 months ago

@rukai I will try to test the speed as soon as possible and provide an example. In addition, I think we need to add a native implementation for high-level.

@UVUUUVUU At the moment this is a bottleneck, because internally a mutex is used to process requests, which prevents multitasking from working well. As a temporary solution, you can use Arc with Mutex. This will be fixed soon

let session = SftpSession::new(channel.into_stream()).await?;
let arc_session = Arc::new(Mutex::new(session));
let sftp = arc_session.clone();
let handle = task::spawn(async move {
    download_file(sftp, remote_path, local_path).await;
});
UVUUUVUU commented 10 months ago

Ok, thanks for your answer!Looking forward for your future work.

UVUUUVUU commented 10 months ago

@UVUUUVUU目前这是一个瓶颈,因为内部使用互斥体来处理请求,这会阻止多任务处理正常工作。作为临时解决方案,您可以使用Arcwith Mutex。这个问题很快就会修复

let session = SftpSession::new(channel.into_stream()).await?;
let arc_session = Arc::new(Mutex::new(session));
let sftp = arc_session.clone();
let handle = task::spawn(async move {
    download_file(sftp, remote_path, local_path).await;
});

how do you want to solve this? could you give some suggestion?

AspectUnk commented 10 months ago

how do you want to solve this? could you give some suggestion?

I think this problem can be solved using a concurrent hashmap because this avoids blocking. the main task is to create a non-blocking list of request/response. flurry may solve this problem, i like their implementation.

Oh, by the way, we can just use broadcast, each consumer will only receive his own request.

AspectUnk commented 4 months ago

The optimization problem should now be resolved with PR #30. Published as 2.0.0 🚀 329891837-e3b2493c-d259-4b10-b283-e8a1f44c8784 Now we have benche and "example".

I'm trying to replace a system where I pipe stdout to/from dd over ssh to transfer files. It takes about 2-3s to transfer a 1MB file that way.

@rukai It would be nice to get feedback from you.