denoland / fastwebsockets

A fast RFC6455 WebSocket implementation
https://docs.rs/fastwebsockets/
Apache License 2.0
811 stars 56 forks source link

proxy connection problem #74

Closed guotie closed 1 month ago

guotie commented 4 months ago

I use the same proxy connection connect to websocket, tungstenite works well, but fastwebsockets failed.

when connect to wss://echo.websocket.org, the error is connection closed before message completed;

when connect to wss://ws.okx.com:8443/ws/v5/public, the error is Invalid status code: 400.

code is below:


use std::future::Future;

use anyhow::Result;
use bytes::Bytes;
use fastwebsockets::handshake;
use fastwebsockets::FragmentCollector;
use http_body_util::Empty;
use hyper::header::CONNECTION;
use hyper::header::UPGRADE;
use hyper::rt::{Read, Write};
use hyper::upgrade::Upgraded;
use hyper::Request;
use hyper_util::rt::TokioIo;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

struct SpawnExecutor;

impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
where
    Fut: Future + Send + 'static,
    Fut::Output: Send + 'static,
{
    fn execute(&self, fut: Fut) {
        tokio::task::spawn(fut);
    }
}

pub async fn connect(
    uri: &str,
    proxy_url: &str,
) -> Result<FragmentCollector<TokioIo<Upgraded>>> {
    let url = uri.parse::<hyper::Uri>()?;
    let host = url.host().unwrap();
    let port = url.port_u16().unwrap_or(443);
    let req = Request::builder()
        .method("GET")
        .uri(uri)
        .header("Host", host)
        .header(UPGRADE, "websocket")
        .header(CONNECTION, "upgrade")
        .header("Sec-WebSocket-Key", handshake::generate_key())
        .header("Sec-WebSocket-Version", "13")
        .body(Empty::<Bytes>::new())?;

    let proxy = proxy_url.parse::<hyper::Uri>()?;
    let proxy_host = proxy.host().unwrap();
    let proxy_port = proxy.port_u16().unwrap_or(7890);
    let tcp_stream = TcpStream::connect(format!("{}:{}", proxy_host, proxy_port)).await?;

    let tcp_stream = tunnel(tcp_stream, host.to_string(), port).await;

    let (ws, _) = handshake::client(&SpawnExecutor, req, tcp_stream).await?;  // both failed at here!
    Ok(FragmentCollector::new(ws))
}

async fn tunnel(mut conn: TcpStream, host: String, port: u16) -> TcpStream {
    let mut buf = format!(
        "CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}:{port}\r\nProxy-Connection: Keep-Alive\r\n"
    )
    .into_bytes();

    buf.extend_from_slice(b"\r\n");

    conn.write(&buf).await.unwrap();
    let mut buf = [0; 8192];
    let mut pos = 0;

    loop {
        let n = conn.read(&mut buf[pos..]).await.unwrap();

        if n == 0 {
            panic!("unexpected eof while tunneling");
        }
        pos += n;

        let recvd = &buf[..pos];
        if recvd.starts_with(b"HTTP/1.1 200") || recvd.starts_with(b"HTTP/1.0 200") {
            if recvd.ends_with(b"\r\n\r\n") {
                println!("tunnel {}:{} success", host, port);
                return conn;
            }
            if pos == buf.len() {
                panic!("proxy headers too long for tunnel");
            }
            // else read more
        } else if recvd.starts_with(b"HTTP/1.1 407") {
            panic!("proxy authentication required");
        } else {
            panic!("unsuccessful tunnel");
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::okx2::{connect};
    use fastwebsockets::{Frame, OpCode};

    #[tokio::test]
    async fn depth_connect() {
        let target = "wss://echo.websocket.org";
        // let target = "wss://ws.okx.com:8443/ws/v5/public";
        let proxy = "http://127.0.0.1:7890";
        let mut ws = connect(target, proxy).await.unwrap();

        // ws.write_frame("");
        loop {
            let msg = match ws.read_frame().await {
                Ok(msg) => msg,
                Err(e) => {
                    println!("Error: {}", e);
                    ws.write_frame(Frame::close_raw(vec![].into()))
                        .await
                        .unwrap();
                    break;
                }
            };

            match msg.opcode {
                OpCode::Text => {
                    let payload =
                        String::from_utf8(msg.payload.to_vec()).expect("Invalid UTF-8 data");
                    println!("{:?}", payload);
                }
                OpCode::Close => {
                    break;
                }
                _ => {}
            }
        }
    }
}
erebe commented 1 month ago

@guotie I manage to use fastwebsocket with a proxy. You can take a look at it there https://github.com/erebe/wstunnel/blob/main/src/tcp.rs#L104 https://github.com/erebe/wstunnel/blob/main/src/tunnel/transport/websocket.rs#L136

littledivy commented 1 month ago

Closing as it is answered and doesn't look like a fastwebsockets bug.