AspectUnk / russh-sftp

SFTP subsystem supported server and client for Russh
Apache License 2.0
62 stars 21 forks source link

Should `russh-sftp::server::process_handler` call flush on every write? #16

Closed michaelfortunato closed 7 months ago

michaelfortunato commented 8 months ago

Question

Hi, I am currently writing an ssh daemon and trying to support the sftp subsystem. I noticed that russh-sftp::server::process_handler does not call flush each time it writes a response and was wondering if it should.

Background - Spawning Handlers

What I have done is for each sftp sub-system request, I spawn a custom executable, ./sftp which will handle the request onwards and my daemon communicates to it via its stdin and out. The daemon forwards the bytes from the socket to the sftp child proccess's stdin and forward the bytes from the child process's stdout to the socket (see async_io_copy_loop). See the snippet here

    async fn subsystem_request(
        mut self,
        channel_id: ChannelId,
        name: &str,
        mut session: Session,
    ) -> Result<(Self, Session), Self::Error> {
        info!("Subsystem: {}", name);
        if cfg!(feature = "sftp") && name == "sftp" {
            let channel = self.channels.lock().unwrap().remove(&channel_id).unwrap();
            let handler_path = "./sftp";
            let mut child = tokio::process::Command::new(handler_path)
                // Allocate two pipes for stdin and stdout through which the parent can
                // talk to the child.
                .stdin(std::process::Stdio::piped())
                .stdout(std::process::Stdio::piped())
                .spawn()
                .expect("failed to run rev");
            let child_stdin = child.stdin.take().unwrap();
            let mut child_stdout = child.stdout.take().unwrap();

            let (connection_reader, connection_writer) = tokio::io::split(channel.into_stream());
            tokio::spawn(
                async_io_copy_loop(
                    connection_reader,
                    connection_writer,
                    child_stdout,
                    child_stdin,
                    child.id().unwrap() as u64,
                )
                .instrument(info_span!("SFTP I/O Copy Loop", client=%self.client_addr)),
            );
            info!("IO Copy thread launched");
            session.channel_success(channel_id);
        } else {
            session.channel_failure(channel_id);
        }

        Ok((self, session))
    }

Background - Implementing The SFTP executable

In order to implement my sftp executable which will handle the connection, I wrote the following.

use async_trait::async_trait;
use flexi_logger::LevelFilter;
use log::*;
use russh_sftp::protocol::{
    Data, File, FileAttributes, Handle, Name, OpenFlags, Status, StatusCode, Version,
};
use russh_sftp::server::run;
use russh_sftp::server::Handler;
use std::collections::HashMap;
use std::{
    pin::Pin,
    process::ExitCode,
    task::{Context, Poll},
};
use tokio::io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
/// Spawns the run function spawns a seperate thread
///
struct IOWrapper<R, W>
where
    R: AsyncRead,
    W: AsyncWrite,
{
    reader: R,
    writer: W,
}
impl<R, W> AsyncRead for IOWrapper<R, W>
where
    R: AsyncRead + Unpin,  // Ensure R implements AsyncRead and Unpin
    W: AsyncWrite + Unpin, // Ensure W implements AsyncWrite and Unpin
{
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> std::task::Poll<std::io::Result<()>> {
        let self_mut = self.get_mut();
        let reader_pin = std::pin::Pin::new(&mut self_mut.reader);
        reader_pin.poll_read(cx, buf) // call poll_read on the pinned reader
    }
}

impl<R, W> AsyncWrite for IOWrapper<R, W>
where
    R: AsyncRead + Unpin,  // Ensure R implements AsyncRead and Unpin
    W: AsyncWrite + Unpin, // Ensure W implements AsyncWrite and Unpin
{
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, std::io::Error>> {
        let self_mut = self.get_mut();
        let mut writer_pin = std::pin::Pin::new(&mut self_mut.writer);
        info!("poll_write called!");
        info!("Contents: {:?}", buf);
        let res = writer_pin.as_mut().poll_write(cx, buf);
        // writer_pin.poll_flush(cx);
        res
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        let self_mut = self.get_mut();
        let writer_pin = Pin::new(&mut self_mut.writer);
        info!("poll_flush called!");
        writer_pin.poll_flush(cx)
    }

    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        let self_mut = self.get_mut();
        let writer_pin = Pin::new(&mut self_mut.writer);
        info!("poll_shutdown called!");
        writer_pin.poll_shutdown(cx)
    }
}

struct SftpHandler;

/// Server handler for each client. This is `async_trait`
#[async_trait]
impl Handler for SftpHandler {
    /// The type must have an Into<StatusCode>
    /// implementation because a response must be sent
    /// to any request, even if completed by error.
    type Error = StatusCode;

    /// Called by the handler when the packet is not implemented
    fn unimplemented(&self) -> Self::Error {
        StatusCode::OpUnsupported
    }

    /// The default is to send an SSH_FXP_VERSION response with
    /// the protocol version and ignore any extensions.
    #[allow(unused_variables)]
    async fn init(
        &mut self,
        version: u32,
        extensions: HashMap<String, String>,
    ) -> Result<Version, Self::Error> {
        info!("DIDID I GET HIT?????");
        Ok(Version::new())
    }

    /// Called on SSH_FXP_OPEN
    #[allow(unused_variables)]
    async fn open(
        &mut self,
        id: u32,
        filename: String,
        pflags: OpenFlags,
        attrs: FileAttributes,
    ) -> Result<Handle, Self::Error> {
        Err(self.unimplemented())
    }

    /// Called on SSH_FXP_CLOSE.
    /// The status can be returned as Ok or as Err
    #[allow(unused_variables)]
    async fn close(&mut self, id: u32, handle: String) -> Result<Status, Self::Error> {
        Err(self.unimplemented())
    }

    /// Called on SSH_FXP_READ
    #[allow(unused_variables)]
    async fn read(
        &mut self,
        id: u32,
        handle: String,
        offset: u64,
        len: u32,
    ) -> Result<Data, Self::Error> {
        Err(self.unimplemented())
    }

    /// Called on SSH_FXP_WRITE
    #[allow(unused_variables)]
    async fn write(
        &mut self,
        id: u32,
        handle: String,
        offset: u64,
        data: Vec<u8>,
    ) -> Result<Status, Self::Error> {
        Err(self.unimplemented())
    }
}

#[tokio::main(worker_threads = 1)]
async fn main() -> ExitCode {
    let mut stream = IOWrapper {
        reader: stdin(),
        writer: stdout(),
    };
    let handler = SftpHandler;
    use flexi_logger::Logger;
    use flexi_logger::{writers::FileLogWriter, FileSpec};
    let _handle = Logger::try_with_str("trace")
        .unwrap()
        .log_to_file(
            FileSpec::default()
                .directory(
                    "./log",
                )
                .suppress_timestamp()
        )
        .start()
        .unwrap();
    russh_sftp::server::run(stream, handler).await;
    ExitCode::SUCCESS
}

As you can see I have created a new type IOWrapper which implements AsyncRead and AsyncWrite and is just a small shim for tokio's stdin and stdout implementations.

The Problem

The Problem is, when my sftp executable writes to stdout to respond to requests, the bytes are buffered so they do not actually get read by my ssh daemon. As a result, my ssh daemon in the async_io_copy_loop waits indefinitely for the child process's stdout to emit data, but it never does. I found that patching russh-sftp::server::process_handler to flush on every write fixed the problem as the bytes were actually being written to stdout now.

Misc - Update to russh-sftp::server::process_handler

I added one line to russh-sftp::server::process_handler to do the flush on every write. I can make a pull request if this is a good implementation.

async fn process_handler<H, S>(stream: &mut S, handler: &mut H) -> Result<(), Error>
where
    H: Handler + Send,
    S: AsyncRead + AsyncWrite + Unpin,
{
    let mut bytes = read_packet(stream).await?;

    let response = match Packet::try_from(&mut bytes) {
        Ok(request) => process_request(request, handler).await,
        Err(_) => Packet::error(0, StatusCode::BadMessage),
    };

    let packet = Bytes::try_from(response)?;
    stream.write_all(&packet).await?;
    stream.flush().await?; // Flush added here
    Ok(())

Conclusion

Should we be flushing on write in process_handler or is my design, whereby I spawn a handler process for each client and forward bytes to its stdio, fundamentally flawed?

Thanks!

AspectUnk commented 7 months ago

Hello. Sorry for long reply. Yes, usually using flush in networking is unnecessary, but in your case it is necessary. Since it should work with any stream, please make a PR to become a contributor. Thank you

michaelfortunato commented 7 months ago

Is there any concern we are doing a syscall (the flush) on every invocation of process_handler? I assume in current usages of russh-sftp the buffer is being flushed on every request because most of them are using the channel struct from russh as their stream.

AspectUnk commented 7 months ago

Is there any concern we are doing a syscall (the flush) on every invocation of process_handler? I assume in current usages of russh-sftp the buffer is being flushed on every request because most of them are using the channel struct from russh as their stream.

As I said above flush only has an effect on the file i/o. Flush implementation in the network make no sense, and function usually do not perform any useful overhead and return success (russh - call/implementation, tcpstream). In your case of working with file i/o, flush call is necessary because you need instant access to the data, so you have nothing to worry about

AspectUnk commented 7 months ago

Fixed in PR #18