actix / examples

Community showcase and examples of Actix Web ecosystem usage.
Apache License 2.0
3.61k stars 792 forks source link

example of websocket proxy #269

Open GopherJ opened 4 years ago

antonok-edm commented 4 years ago

For what it's worth - this example seems to work great as a WebSocket proxy, as long as the timeout is increased (otherwise the server will bail with status 500 after just 5 seconds):

     let forwarded_req = client
         .request_from(new_url.as_str(), req.head())
+        .timeout(std::time::Duration::from_secs(60))
         .no_decompress();
mbround18 commented 2 years ago

@GopherJ and @antonok-edm I tried the timeout suggestion but it doesnt seem to proxy the websockets. Did yall make some changes outside the example to allow websocket connections?

antonok-edm commented 2 years ago

@mbround18 I don't think it works anymore, unfortunately. If I recall correctly, it worked with actix-web 1.0 and 2.0 but not after upgrading to 3.0.

It'd be great to have an official sample implementation and/or test cases for a websocket proxy.

mbround18 commented 2 years ago

@antonok-edm got some great feedback on discord!

@fakeshadow#3395 on discord is awesome and recommended (not a working example but a great route to go down)

use actix_web::{
    get,
    web::{BytesMut, Payload},
    Error, HttpRequest, HttpResponse,
};

use futures::{channel::mpsc::unbounded, sink::SinkExt, stream::StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[get("/")]
async fn index(req: HttpRequest, mut payload: Payload) -> HttpResponse {
    // collect proxy info from request.

    // forward request and get response
    let (res, socket) = awc::Client::new()
        .ws("ws://foo.bar")
        .connect()
        .await
        .unwrap();

    // check if response is switching protocol and continue.
    assert_eq!(res.status().as_u16(), 101);

    // take the websocket io only so we can transfer raw binary data between source and dest.
    let mut io = socket.into_parts().io;

    // a channel for push response body to stream.
    let (mut tx, rx) = unbounded();

    // a buffer to read from dest and proxy it back to source.
    let mut buf = BytesMut::new();

    // spawn a task read payload stream and forward to websocket connection.
    actix_web::rt::spawn(async move {
        loop {
            tokio::select! {
                // body from source.
                res = payload.next() => {
                    match res {
                        None => return,
                        Some(body) => {
                            let body = body.unwrap();
                            io.write_all(&body).await.unwrap();
                        }
                    }
                }

                // body from dest.
                res = io.read_buf(&mut buf) => {
                    let size = res.unwrap();
                    let bytes = buf.split_to(size).freeze();
                    tx.send(Ok::<_, Error>(bytes)).await.unwrap();
                }
            }
        }
    });

    // return response.
    HttpResponse::SwitchingProtocols().streaming(rx)
} 

They also mentioned combining awc and actix to accept the incoming websocket then offload it to the proxied websocket and passing that connection context back to the client.

jwalton commented 1 year ago

Thanks @mbround18! This solution got me going in the right direction.

One disadvantage to @mbround18's solution above is that when you call client.ws().connect(), the client will always set the Sec-WebSocket-Version header to 13 in the request to the backend. There doesn't seem to be a way to prevent this, and it seems like awc has made various bits private that you'd need to reimplement this. If the client is really connection with version 13 (which, unless a new version of websocket protocol gets released or someone is using a REALLY old browser, it will be), then everything will work just fine, but if the client is using a different version, then we'll send the wrong websocket version to the backend, and it may or may not have problems reading the bytes we're forwarding.

This is my solution, replacing awc with reqwest. This forwards all of the headers from the client through to the backend, and vice versa, so the proxy doesn't care what version of websocket is being used so long as the backend understands it:

pub async fn proxy_ws_request(
    client_req: &HttpRequest,
    client_stream: Payload,
    mut target_url: url::Url,
) -> Result<HttpResponse, Box<dyn std::error::Error>> {
    // Force "http" or else the reqwest client will complain.
    target_url.set_scheme("http").unwrap();

    // Forward the request.
    let mut req = reqwest::ClientBuilder::new().build().unwrap().get(target_url);
    for (key, value) in client_req.headers() {
        req = req.header(key, value);
    }
    let target_response = req.send().await.unwrap();

    // Make sure the server is willing to accept the websocket.
    let status = target_response.status().as_u16();
    if status != 101 {
        return Err(Box::new(std::io::Error::new(
            std::io::ErrorKind::ConnectionRefused,
            "Target did not reply with 101 upgrade",
        )));
    }

    // Copy headers from the target back to the client.
    let mut client_response = HttpResponse::SwitchingProtocols();
    client_response.upgrade("websocket");
    for (header, value) in target_response.headers() {
        client_response.insert_header((header.to_owned(), value.to_owned()));
    }

    let target_upgrade = target_response.upgrade().await?;
    let (target_rx, mut target_tx) = tokio::io::split(target_upgrade);

    // Copy byte stream from the client to the target.
    rt::spawn(async move {
        let mut client_stream = client_stream.map(|result| {
            result.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
        });
        let mut client_read = tokio_util::io::StreamReader::new(&mut client_stream);
        let result = tokio::io::copy(&mut client_read, &mut target_tx).await;
        if let Err(err) = result {
            println!("Error proxying websocket client bytes to target: {err}")
        }
    });

    // Copy byte stream from the target back to the client.
    let target_stream = tokio_util::io::ReaderStream::new(target_rx);
    Ok(client_response.streaming(target_stream))
}
stillinbeta commented 6 months ago

I've written a small crate, though it uses awc so @jwalton's remarks about websocket version hold.