omjadas / hudsucker

Intercepting HTTP/S proxy
https://crates.io/crates/hudsucker
Apache License 2.0
205 stars 34 forks source link

I found that "text/event-stream;" type not send one by one, It send when all the content down #119

Closed songjiachao closed 2 months ago

songjiachao commented 2 months ago

iShot_2024-06-03_19 07 15

omjadas commented 2 months ago

Could you please provide your implementation of HttpHandler?

songjiachao commented 2 months ago
async fn handle_response(&mut self, _ctx: &HttpContext, mut res: Response<Body>) -> Response<Body> {
    res = decode_response(res).unwrap();
    let req = self.req().clone().unwrap();

    let body_mut = res.body_mut();
    let body_bytes: bytes::Bytes = body_mut.collect().await.unwrap_or_default().to_bytes();

    let output_response = ProxiedResponse::new(
      res.status(),
      res.version(),
      res.headers().clone(),
      body_bytes,
      chrono::Local::now().timestamp_nanos_opt().unwrap(),
      hosts::get_ip(req.uri().host().unwrap()),
    );

    self.set_res(output_response).send_output();

    res
  }
omjadas commented 2 months ago

Collecting the body will wait for all of the events to be received before continuing. That is why you are seeing all events being sent from the proxy to the browser in one go. If you want to inspect the response body, I would recommend streaming the body and inspecting each frame one at a time.

songjiachao commented 2 months ago

Thank you very much . Can you help me how to streaming the body and inspecting each frame one at a time.

omjadas commented 2 months ago

One way would be to do something like this

use futures::StreamExt;
use http_body_util::{BodyStream, StreamBody};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

async fn handle_response(&mut self, _ctx: &HttpContext, res: Response<Body>) -> Response<Body> {
    let (tx, rx) = mpsc::channel(100);
    let (parts, body) = res.into_parts();
    let mut body = BodyStream::new(body);

    tokio::spawn(async move {
        while let Some(frame) = body.next().await {
            if let Ok(frame) = frame.as_ref() {
                // inspect frame
            }

            tx.send(frame).await.unwrap();
        }
    });

    Response::from_parts(parts, StreamBody::new(ReceiverStream::new(rx)).into())
}