omjadas / hudsucker

Intercepting HTTP/S proxy
https://crates.io/crates/hudsucker
Apache License 2.0
206 stars 35 forks source link

Access to the open websockets #55

Closed hazyone closed 1 year ago

hazyone commented 1 year ago

I'd appreciate it if there was something like with_connects_collector that takes a Vec(for example) and uses it to store client/server connections to allow not only processing but also sending user data packets.

omjadas commented 1 year ago

If I understand this correctly, you wish to be able to send websocket messages arbitrarily, and not only modify messages. Does that sound right to you?

hazyone commented 1 year ago

Hi! Yes, send websocket messages arbitrarily to both sides: client/server. This is useful for testing purposes, when you need to check if client/server can handle payloads that you can't easily reproduce in the production.

omjadas commented 1 year ago

@hazyone I have added a handle_websocket function to the WebSocketHandler trait in v0.19.3. It is passed the streams and sinks for the underlying WebSockets. You will likely want to implement it something like the following (the spawned task will probably need to be adjusted to better suit your use case).

async fn handle_websocket(
    mut self,
    ctx: WebSocketContext,
    mut stream: impl Stream<Item = Result<Message, tungstenite::Error>> + Unpin + Send + 'static,
    sink: impl Sink<Message, Error = tungstenite::Error> + Unpin + Send + 'static,
) {
    let sink_1 = Arc::new(tokio::sync::Mutex::new(sink));
    let sink_2 = Arc::clone(&sink_1);

    tokio::task::spawn(async move {
        loop {
            sink_1
                .lock()
                .await
                .send(Message::Text("123".to_owned()))
                .await
                .unwrap();

            tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
        }
    });

    while let Some(message) = stream.next().await {
        match message {
            Ok(message) => {
                let Some(message) = self.handle_message(&ctx, message).await else {
                    continue
                };

                match sink_2.lock().await.send(message).await {
                    Err(tungstenite::Error::ConnectionClosed) => (),
                    Err(e) => error!("WebSocket send error: {}", e),
                    _ => (),
                }
            }
            Err(e) => {
                error!("WebSocket message error: {}", e);

                match sink_2.lock().await.send(Message::Close(None)).await {
                    Err(tungstenite::Error::ConnectionClosed) => (),
                    Err(e) => error!("WebSocket close error: {}", e),
                    _ => (),
                };

                break;
            }
        }
    }
}