dizda / fast-socks5

Fast SOCKS5 client/server implementation written in Rust async/.await (with tokio)
https://anyip.io/
MIT License
340 stars 67 forks source link

feature request: ability to track of bytes send and received #20

Closed GlenDC closed 2 years ago

GlenDC commented 2 years ago

In regards to #15. I managed to make that all work. However, and please check the code linked there, I had to make it catch ErroKind (NotConnected/ConnectionReset).

This essentially does mean that we do have access to the bytes send/received (written/read). However even when no error that number does always seem to be (0,0) (used as a router, but even when running a regular server as per your example).

I tried to write copy directional myself (copying the code and catching the error):

use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use fast_socks5::ready;

use std::future::Future;
use std::io::{self, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};

enum TransferState {
    Running(CopyBuffer),
    ShuttingDown(u64),
    Done(u64),
}

struct CopyBidirectional<'a, A: ?Sized, B: ?Sized> {
    a: &'a mut A,
    b: &'a mut B,
    a_to_b: TransferState,
    b_to_a: TransferState,
}

fn transfer_one_direction<A, B>(
    cx: &mut Context<'_>,
    state: &mut TransferState,
    r: &mut A,
    w: &mut B,
) -> Poll<io::Result<u64>>
where
    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
    let mut r = Pin::new(r);
    let mut w = Pin::new(w);

    loop {
        match state {
            TransferState::Running(buf) => {
                let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
                *state = TransferState::ShuttingDown(count);
            }
            TransferState::ShuttingDown(count) => {
                match ready!(w.as_mut().poll_shutdown(cx)) {
                    Ok(_) => (),
                    Err(err) => match err.kind() {
                        ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
                        _ => return Poll::Ready(Err(err)),
                    },
                };

                *state = TransferState::Done(*count);
            }
            TransferState::Done(count) => return Poll::Ready(Ok(*count)),
        }
    }
}

impl<'a, A, B> Future for CopyBidirectional<'a, A, B>
where
    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
    type Output = io::Result<(u64, u64)>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Unpack self into mut refs to each field to avoid borrow check issues.
        let CopyBidirectional {
            a,
            b,
            a_to_b,
            b_to_a,
        } = &mut *self;

        let a_to_b = transfer_one_direction(cx, a_to_b, &mut *a, &mut *b)?;
        let b_to_a = transfer_one_direction(cx, b_to_a, &mut *b, &mut *a)?;

        // It is not a problem if ready! returns early because transfer_one_direction for the
        // other direction will keep returning TransferState::Done(count) in future calls to poll
        let a_to_b = ready!(a_to_b);
        let b_to_a = ready!(b_to_a);

        Poll::Ready(Ok((a_to_b, b_to_a)))
    }
}

/// Copies data in both directions between `a` and `b`.
///
/// This function returns a future that will read from both streams,
/// writing any data read to the opposing stream.
/// This happens in both directions concurrently.
///
/// If an EOF is observed on one stream, [`shutdown()`] will be invoked on
/// the other, and reading from that stream will stop. Copying of data in
/// the other direction will continue.
///
/// The future will complete successfully once both directions of communication has been shut down.
/// A direction is shut down when the reader reports EOF,
/// at which point [`shutdown()`] is called on the corresponding writer. When finished,
/// it will return a tuple of the number of bytes copied from a to b
/// and the number of bytes copied from b to a, in that order.
///
/// [`shutdown()`]: crate::io::AsyncWriteExt::shutdown
///
/// # Errors
///
/// The future will immediately return an error if any IO operation on `a`
/// or `b` returns an error. Some data read from either stream may be lost (not
/// written to the other stream) in this case.
///
/// # Return value
///
/// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`.
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error>
where
    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
    CopyBidirectional {
        a,
        b,
        a_to_b: TransferState::Running(CopyBuffer::new()),
        b_to_a: TransferState::Running(CopyBuffer::new()),
    }
    .await
}

#[derive(Debug)]
struct CopyBuffer {
    read_done: bool,
    need_flush: bool,
    pos: usize,
    cap: usize,
    amt: u64,
    buf: Box<[u8]>,
}

impl CopyBuffer {
    pub(super) fn new() -> Self {
        Self {
            read_done: false,
            need_flush: false,
            pos: 0,
            cap: 0,
            amt: 0,
            buf: vec![0; 2048].into_boxed_slice(),
        }
    }

    pub(super) fn poll_copy<R, W>(
        &mut self,
        cx: &mut Context<'_>,
        mut reader: Pin<&mut R>,
        mut writer: Pin<&mut W>,
    ) -> Poll<io::Result<u64>>
    where
        R: AsyncRead + ?Sized,
        W: AsyncWrite + ?Sized,
    {
        loop {
            // If our buffer is empty, then we need to read some data to
            // continue.
            if self.pos == self.cap && !self.read_done {
                let me = &mut *self;
                let mut buf = ReadBuf::new(&mut me.buf);

                match reader.as_mut().poll_read(cx, &mut buf) {
                    Poll::Ready(Ok(_)) => (),
                    Poll::Ready(Err(err)) => match err.kind() {
                        ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
                        _ => return Poll::Ready(Err(err)),
                    },
                    Poll::Pending => {
                        // Try flushing when the reader has no progress to avoid deadlock
                        // when the reader depends on buffered writer.
                        if self.need_flush {
                            match ready!(writer.as_mut().poll_flush(cx)) {
                                Ok(_) => (),
                                Err(err) => match err.kind() {
                                    ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
                                    _ => return Poll::Ready(Err(err)),
                                },
                            };
                            self.need_flush = false;
                        }

                        return Poll::Pending;
                    }
                }

                let n = buf.filled().len();
                if n == 0 {
                    self.read_done = true;
                } else {
                    self.pos = 0;
                    self.cap = n;
                }
            }

            // If our buffer has some data, let's write it out!
            while self.pos < self.cap {
                let me = &mut *self;
                let i = match ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap])) {
                    Ok(i) => (i),
                    Err(err) => match err.kind() {
                        ErrorKind::NotConnected | ErrorKind::ConnectionReset => 1,
                        _ => return Poll::Ready(Err(err)),
                    },
                };
                if i == 0 {
                    return Poll::Ready(Err(io::Error::new(
                        io::ErrorKind::WriteZero,
                        "write zero byte into writer",
                    )));
                } else {
                    self.pos += i;
                    self.amt += i as u64;
                    self.need_flush = true;
                }
            }

            // If pos larger than cap, this loop will never stop.
            // In particular, user's wrong poll_write implementation returning
            // incorrect written length may lead to thread blocking.
            debug_assert!(
                self.pos <= self.cap,
                "writer returned length larger than input slice"
            );

            // If we've written all the data and we've seen EOF, flush out the
            // data and finish the transfer.
            if self.pos == self.cap && self.read_done {
                match ready!(writer.as_mut().poll_flush(cx)) {
                    Ok(_) => (),
                    Err(err) => match err.kind() {
                        ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
                        _ => return Poll::Ready(Err(err)),
                    },
                };
                return Poll::Ready(Ok(self.amt));
            }
        }
    }
}

However this still has the same issue, that is, the returned data read/written is (0,0).

It is not as simple as that:

fast_socks5::server] incoming connection from peer 127.0.0.1:64345 @ 127.0.0.1:1337
[2022-05-05T22:26:57Z DEBUG proxy_gateway] handle incoming socket
[2022-05-05T22:26:57Z DEBUG proxy_gateway] upgrade incoming socket as socks5 proxy
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Handshake headers: [version: 5, methods len: 3]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] methods supported sent by the client: [0, 1, 2]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Reply with method AuthenticationMethod::Password (2)
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Auth: [version: 1, user len: 9]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] username bytes: [112, 117, 112, 112, 101, 116, 101, 101, 114]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Auth: [pass len: 3]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] password bytes: [98, 97, 114]
[2022-05-05T22:26:57Z INFO  fast_socks5::server] User `puppeteer` logged successfully.
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Request: [version: 5, command: 1, rev: 0, address_type: 1]
[2022-05-05T22:26:57Z DEBUG fast_socks5::util::target_addr] Address type `IPv4`
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Request target is 185.199.108.153:443
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Domain won't be resolved because `dns_resolve`'s config has been turned off.
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Connected to remote destination
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Wrote success
[2022-05-05T22:26:58Z INFO  fast_socks5::server] transfer closed (615, 11010)
[2022-05-05T22:26:58Z DEBUG proxy_gateway] log original target address of incoming socket
[2022-05-05T22:26:58Z DEBUG proxy_gateway] resolve target dns for incoming socket
[2022-05-05T22:26:58Z INFO  fast_socks5::client] Connected @ 127.0.0.1:1338
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Send version and method len [5, 2]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] client auth methods supported: [0, 2]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Socks version (5), method chosen: 2.
[2022-05-05T22:26:58Z INFO  fast_socks5::client] Password will be used
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Auth: [version: 1, is_success: 0]
[2022-05-05T22:26:58Z INFO  fast_socks5::client] Requesting headers `Some(Ip(185.199.108.153:443))`...
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] TargetAddr::IpV4
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] addr ip [185, 199, 108, 153]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Bytes long version: [5, 1, 0, 1, 185, 199, 108, 153, 1, 187, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Bytes shorted version: [5, 1, 0, 1, 185, 199, 108, 153, 1, 187]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Padding: 10
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Reply received: [version: 5, reply: 0, rsv: 0, address_type: 1]
[2022-05-05T22:26:58Z DEBUG fast_socks5::util::target_addr] Address type `IPv4`
[2022-05-05T22:26:58Z INFO  fast_socks5::client] Remote server bind on 127.0.0.1:0.
[2022-05-05T22:26:58Z INFO  proxy_gateway] socket transfer closed (0, 0)
"2022-05-05 22:26:57.929731 UTC","185.199.108.153:443","127.0.0.1:1338","","",0

From the logs we can see that the size is returned for the initial part of the socks5 stream (header), but once we do the actual transfer we seem to log nothing.

What am I doing wrong here? both in my fork and vanilla code. I really need for my purposes to be able to log the payload size.

dizda commented 2 years ago

Hey there,

From running the server example by using RUST_LOG=debug cargo run --example server -- --listen-addr 127.0.0.1:1337 password -u admin -p password

I do get the right amount of bytes transferred as mentioned from the logs:

[2022-05-07T04:03:39Z INFO fast_socks5::server] transfer closed (79, 552)

An improvement we could bring, would be to have to server::transfer() function to return the amount of bytes transferred: https://github.com/dizda/fast-socks5/blob/f711ae0f26332975ab06fd89c818f622328038a2/src/server.rs#L643

GlenDC commented 2 years ago

Yes that would be one step in the right direction. Even though it will need to bubble up.

A related problem though is that tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await doesn't return those values in case of an error, which on MacOS is possible due to the client closing (see #23).

GlenDC commented 2 years ago

Going to close this one as the other work already contributed by me in combination with the root cause of #23 are the reasons why we do not always receive it. Trying to track it from within this crate wouldn't resolve it or even work due to the issues causing it.

Closing this as such.