hatoo / http-mitm-proxy

A HTTP proxy server library intended to be a backend of application like Burp proxy
MIT License
22 stars 2 forks source link

docs(examples): Request and Response Body Cloning While Forwarding Streams #58

Closed flaviodelgrosso closed 2 weeks ago

flaviodelgrosso commented 2 weeks ago

First off, thank you for developing and maintaining this crate. I'm currently working on a feature that involves forwarding HTTP requests and responses between a client and an upstream server. Specifically, I need to copy data from the streams while forwarding them, allowing me to save the contents of both requests and responses in a buffer for later processing.


Problem:

When handling the response body from the upstream server, I'm able to forward it back to the client and clone the body successfully. However, I'm encountering difficulties when trying to apply the same logic to incoming requests.

For Responses:

I implemented a BodyClone struct (see code below) and managed to forward the response while cloning the body using the approach of boxing according to your examples:

Ok::<_, http_mitm_proxy::default_client::Error>(res.map(|b| b.boxed()))

This works well for responses, allowing me to forward and clone the response body simultaneously.

For Requests:

The challenge arises with requests. The .bind method expects a service with a request of type Request<Incoming>, so there's a mismatching types between Request<Incoming> and Request<BoxBody<bytes::Bytes, hyper::Error>>.

Can you add some examples in the repo regarding this topic or maybe better solutions than mines? Thanks in advance.


That's the full example:

use bytes::Bytes;
use hyper::body::{Body, Frame, Incoming, SizeHint};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;

pub struct BodyClone<T> {
  inner: T,
  tx: mpsc::Sender<Bytes>,
  end_stream: bool,
}

impl BodyClone<Incoming> {
  pub fn new(body: Incoming) -> (Self, mpsc::Receiver<Bytes>) {
    let (tx, rx) = mpsc::channel(10);
    (
      BodyClone {
        inner: body,
        tx,
        end_stream: false,
      },
      rx,
    )
  }

  pub fn into_inner(self) -> Incoming {
    self.inner
  }
}

impl Body for BodyClone<Incoming> {
  type Data = Bytes;
  type Error = hyper::Error;

  fn poll_frame(
    mut self: Pin<&mut Self>,
    cx: &mut Context<'_>,
  ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
    match Pin::new(&mut self.inner).poll_frame(cx) {
      Poll::Ready(Some(Ok(frame))) => {
        if let Ok(data) = frame.into_data() {
          let _ = self.tx.try_send(data.clone());
          Poll::Ready(Some(Ok(Frame::data(data))))
        } else {
          Poll::Ready(None)
        }
      }
      Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
      Poll::Ready(None) => {
        self.end_stream = true;
        Poll::Ready(None)
      }
      Poll::Pending => Poll::Pending,
    }
  }

  fn is_end_stream(&self) -> bool {
    self.end_stream
  }

  fn size_hint(&self) -> SizeHint {
    self.inner.size_hint()
  }
}

type ClonedBodyAndReceiver = (BodyClone<Incoming>, mpsc::Receiver<bytes::Bytes>);

pub trait BodyCloning {
  fn clone(self) -> ClonedBodyAndReceiver;
}

impl BodyCloning for Incoming {
  fn clone(self) -> ClonedBodyAndReceiver {
    let (cloned_body, rx) = BodyClone::new(self);
    (cloned_body, rx)
  }
}

This is the implementation:


// -----------------------------------
let server = proxy
    .bind(("127.0.0.1", opt.port), move |_client_addr, mut req| {
      let client = client.clone();
      async move {
        let uri = req.uri().clone();

        if uri.host() == Some("my_interested_host_1") {
          let new_req = inspect_request(req).await?;
          req = new_req; <-------------------------------------------- here the problem
        }

        let (res, _upgrade) = client.send_request(req).await?;

        if uri.host() == Some("my_interested_host_2") {
          let res = inspect_response(res).await?;
          return Ok(res);
        }

        Ok::<_, http_mitm_proxy::default_client::Error>(res.map(|b| b.boxed()))
      }
    })
    .await?;

// -----------------------------------

pub async fn inspect_request(
  req: Request<Incoming>,
) -> anyhow::Result<
  Request<BoxBody<bytes::Bytes, hyper::Error>>,
  http_mitm_proxy::default_client::Error,
> {
  let (parts, body) = req.into_parts();

  let cloned_body = inspect_body(body);
  let new_req = Request::from_parts(parts, cloned_body.boxed());

  Ok(new_req)
}

// Function to log each chunk of the response body without consuming it.
pub async fn inspect_response(
  res: Response<Incoming>,
) -> anyhow::Result<Response<BoxBody<Bytes, hyper::Error>>, http_mitm_proxy::default_client::Error>
{
  let (parts, body) = res.into_parts();

  let cloned_body = inspect_body(body);
  let new_res = Response::from_parts(parts, cloned_body.boxed());

  Ok(new_res)
}

fn inspect_body(body: Incoming) -> BodyClone<Incoming> {
  let (cloned_body, mut rx) = body.clone();

  tokio::spawn(async move {
    let mut buf = Vec::new();
    while let Some(chunk) = rx.recv().await {
      buf.extend_from_slice(&chunk);
    }

    if let Ok(body_str) = String::from_utf8(buf) {
      println!("Body: {body_str}");
    }
  });

  cloned_body
}
hatoo commented 2 weeks ago

You can convert Request<Incoming> to Request<BoxBody<bytes::Bytes, hyper::Error>>.

let req = if uri.host() == Some("my_interested_host_1") {
                    let new_req = inspect_request(req).await?;
                    new_req // <-------------------------------------------- here the problem
                } else {
                    req.map(|b| b.boxed())
                };
flaviodelgrosso commented 2 weeks ago

You can convert Request<Incoming> to Request<BoxBody<bytes::Bytes, hyper::Error>>.

let req = if uri.host() == Some("my_interested_host_1") {
                    let new_req = inspect_request(req).await?;
                    new_req // <-------------------------------------------- here the problem
                } else {
                    req.map(|b| b.boxed())
                };

Totally right, I'm stupid :). Thanks for suggestion. It would be nice however to have a concrete implementation of Body struct that represents all body types. Something like:

#[derive(Debug)]
enum Internal {
    BoxBody(BoxBody<Bytes, Error>),
    Collected(Collected<Bytes>),
    Empty(Empty<Bytes>),
    Full(Full<Bytes>),
    Incoming(Incoming),
    String(String),
}

#[derive(Debug)]
pub struct Body {
    inner: Internal,
}

// continue with implementation.....