SuaveIO / suave

Suave is a simple web development F# library providing a lightweight web server and a set of combinators to manipulate route flow and task composition.
https://suave.io
Other
1.32k stars 198 forks source link

Writing a proxy WebPart using SocketTask #745

Closed njlr closed 3 years ago

njlr commented 3 years ago

I am trying to take a response from a WebClient and forward the stream to a SocketTask.

The data does come though, but the request never completes.

  (fun ctx ->   
    async {
      // ...

      try
        let! response =
          client.SendAsync message
          |> Async.AwaitTask

        let! contentStream =
          response.Content.ReadAsStreamAsync ()
          |> Async.AwaitTask

        let write (conn, _) = socket {
          let bytes = Array.create 1024 0uy
          let buffer = ArraySegment bytes

          do! transferStreamWithBufer buffer conn contentStream

          // Is something missing here?

          return conn
        }

        return!
          {
            ctx with
              response =
                {
                  ctx.response with
                    status = HTTP_200.status
                    content = SocketTask write
                }
          }
          |> succeed
      with exn ->
        printfn "%A" exn

        return!
          (
            OK "Unable to proxy the request. "
            >=> Writers.setStatus HTTP_502 // Bad gateway
          ) ctx
    })

What am I missing here?

ademar commented 3 years ago

Hum .. try setting the close header like here

https://github.com/SuaveIO/suave/blob/master/src/Suave/Combinators.fs#L861

A hack I know

njlr commented 3 years ago

Hmm, now I get

[16:52:23 ERR] Socket error while writing response SocketError ConnectionReset

I think I will expand my question to the actual use-case I have, which is updating the old proxy code to the latest Suave version.

So I am able to get a basic proxy to work, but only when I use a Bytes response. I would rather do some clever streaming with SocketTask but I am unable to figure out the API.

Here is my code, perhaps we can fix it up and make a PR out of it?


open System
open System.IO
open System.Net
open Suave
open Suave.Operators
open Suave.Successful
open Suave.RequestErrors
open Suave.Filters
open Suave.Sockets
open Suave.Sockets.Control

module DateTime =

  let tryParse (x : string) =
    match DateTime.TryParse x with
    | (true, x) -> Some x
    | _ -> None

module Int64 =

  let tryParse (x : string) =
    match Int64.TryParse x with
    | (true, x) -> Some x
    | _ -> None

let private (?) headers (name : string)  =
  headers
  |> Seq.tryFind (fun (k, _) -> String.Equals(k, name, StringComparison.OrdinalIgnoreCase))
  |> Option.map snd

let private readAllBytes (stream : Stream) =
  use ms = new MemoryStream ()
  stream.CopyTo ms
  ms.ToArray ()

let private httpWebResponseToHttpContext (ctx : HttpContext) (response : HttpWebResponse) =
  let status =
    match HttpCode.tryParse (int response.StatusCode) with
    | Choice1Of2 x -> x.status
    | _ -> HTTP_502.status

  let headers =
    response.Headers.AllKeys
    |> Seq.map (fun k -> k, response.Headers.Get k)
    |> Seq.toList

  // Why doesn't this work?
  // let content =
  //   SocketTask
  //     (fun (conn, _) -> socket {
  //       let stream = response.GetResponseStream ()

  //       let bytes = Array.create 1024 0uy
  //       let buffer = ArraySegment bytes

  //       do! transferStreamWithBufer buffer conn stream

  //       let! conn = flush conn

  //       return conn
  //     })

  let content =
    readAllBytes (response.GetResponseStream ())
    |> Bytes

  {
    ctx with
      response =
        {
          ctx.response with
            status = status
            headers = headers
            content = content
        }
  }

let proxy (newHost : Uri) : WebPart =
  (fun ctx ->
    async {
      let remappedAddress =
        if [ 80; 443 ] |> Seq.contains newHost.Port
        then
          sprintf "%s://%s%s" newHost.Scheme newHost.Host ctx.request.path
        else
          sprintf "%s://%s:%i%s" newHost.Scheme newHost.Host newHost.Port ctx.request.path

      let request = WebRequest.Create remappedAddress :?> HttpWebRequest

      request.Method <- ctx.request.rawMethod
      request.Proxy <- null
      request.AllowAutoRedirect <- false
      request.AllowReadStreamBuffering <- false
      request.AllowWriteStreamBuffering <- false

      match ctx.request.headers ? ("User-Agent") with | Some x -> request.UserAgent <- x | None -> ()
      match ctx.request.headers ? ("Accept") with | Some x -> request.Accept <- x | None -> ()
      match ctx.request.headers ? ("Date") |> Option.bind DateTime.tryParse with | Some x -> request.Date <- x | None -> ()
      match ctx.request.headers ? ("Host") with | Some x -> request.Host <- x | None -> ()
      match ctx.request.headers ? ("Content-Type") with | Some x -> request.ContentType <- x | None -> ()
      match ctx.request.headers ? ("Content-Length") |> Option.bind Int64.tryParse with | Some x -> request.ContentLength <- x | None -> ()

      request.Headers.Add("X-Forwarded-For", ctx.request.host)

      if [ HttpMethod.POST; HttpMethod.PUT ] |> Seq.contains ctx.request.method
      then
        let! requestStream =
          request.GetRequestStreamAsync ()
          |> Async.AwaitTask

        for b in ctx.request.rawForm do
          requestStream.WriteByte b

      try
        let! response = request.AsyncGetResponse ()
        let response = response :?> HttpWebResponse

        return httpWebResponseToHttpContext ctx response |> Some
      with
      | :? WebException as ex when not (isNull ex.Response) ->
        let response = ex.Response :?> HttpWebResponse

        return httpWebResponseToHttpContext ctx response |> Some
      | exn ->
        ctx.runtime.logger.log
          Logging.Error
          (fun lvl ->
            Logging.Message.event lvl (sprintf "Unable to proxy the request %A %A. " ctx.request.rawMethod remappedAddress)
            |> Logging.Message.addExn exn)

        return!
          (
            OK "Unable to proxy the request. "
            >=> Writers.setStatus HTTP_502
          ) ctx
    })
njlr commented 3 years ago

Related: https://github.com/SuaveIO/suave/issues/636

ademar commented 3 years ago

Cool, I'll like to have Proxy back. I'll dedicate some time to this; right now I am stuck with HttpWebRequest returning 404s.

ademar commented 3 years ago

Ok, the issue is that when using SocketTask suave does not finish writing the headers; in particular the Content-Length header is not sent. So after modifying your code like this it seems to work alright.

let private httpWebResponseToHttpContext (ctx : HttpContext) (response : HttpWebResponse) =
  let status =
    match HttpCode.tryParse (int response.StatusCode) with
    | Choice1Of2 x -> x.status
    | _ -> HTTP_502.status

  let headers =
    response.Headers.AllKeys
    |> Seq.map (fun k -> k, response.Headers.Get k)
    |> Seq.toList

  let writeContentLengthHeader conn = socket{
    match headers ? ("Content-Length") with
    | Some x ->
      let! (_, conn) = asyncWriteLn (String.Concat [| "Content-Length: "; x |]) conn
      return conn
    | None ->
      return conn
    }

  // Now it works
  let content =
    SocketTask
      (fun (conn, _) -> socket {
          let! conn = writeContentLengthHeader conn
          let! (_, conn) = asyncWriteLn "" conn
          let! conn = flush conn
          let stream = response.GetResponseStream ()
          do! transferStream conn stream
          return conn
       })

  {
    ctx with
      response =
        {
          ctx.response with
            status = status
            headers = headers
            content = content
        }
  }
njlr commented 3 years ago

Interesting!

A few quick questions:

ademar commented 3 years ago

This becomes clear if you start in HttpOutput.writeResponse and follow through HttpOutput.writeContent.

You will see that the content-length header is not written and this makes sense because in general you wouldn't know or have a content-length unless you know how many bytes are going to be written in advance. In the proxy case we know the content-length because downstream let us know.

njlr commented 3 years ago

PR: https://github.com/SuaveIO/suave/pull/746