valyala / fasthttp

Fast HTTP package for Go. Tuned for high performance. Zero memory allocations in hot paths. Up to 10x faster than net/http
MIT License
21.89k stars 1.76k forks source link

Timeout stream response connection does not clear buffer data for re-use #1743

Closed TAYTS closed 6 months ago

TAYTS commented 7 months ago

I am using fasthttp as a proxy server which will call downstream service that return stream response, however when downstream request timeout before fully returned the stream response body will cause the next request fail to parse the response header due the reason that the clientconn is being re-used and there are some data from previous request.

In the default transport RoundTrip(), when we only use hc.releaseConn(cc) in the resp.bodyStream instead of closing the conn. For stream response, is it possible to close the tcp connection if it is timeout to prevent the dirty data from previous request? Maybe we could change the newCloseReader's close function to check if the read is timeout or others issue to better cleanup the connection?

resp.bodyStream = newCloseReader(rbs, func() error {
    hc.releaseReader(br)
    if r, ok := rbs.(*requestStream); ok {
        releaseRequestStream(r)
    }
    if closeConn || resp.ConnectionClose() {
        hc.closeConn(cc)
    } else {
        hc.releaseConn(cc)
    }
    return nil
})

How to re-produce:

func main() {
    var lbc = fasthttp.LBClient{
        Clients: make([]fasthttp.BalancingClient, 0),
        Timeout: 1 * time.Second,
    }

    downStreamAdd := "127.0.0.1:3000"
    lbc.Clients = append(lbc.Clients, &fasthttp.HostClient{
        Addr:               downStreamAdd,
        StreamResponseBody: true,
                 RetryIf: func(request *fasthttp.Request) bool {
                     return false
                 },
    })

    server := fasthttp.Server{
        Handler: func(ctx *fasthttp.RequestCtx) {
            err := lbc.Do(&ctx.Request, &ctx.Response)
            fmt.Println(err)
        },
    }

    if err := server.ListenAndServe("localhost:8080"); err != nil {
        fmt.Println("fail to start server", err.Error())
    }
}
  1. Create stream response service that return data more 1 second
  2. Make another request after timeout
  3. Receive error message failed to read response haeder
    fail to make request: error when reading response headers: cannot find whitespace in the first line of response "2\r\n9\n\r\n3\r\n10\n\r\n3\r\n11\n\r\n3\r\n12\n\r\n3\r\n13\n\r\n3\r\n14\n\r\n3\r\n15\n\r\n3\r\n16\n\r\n3\r\n17\n\r\n3\r\n18\n\r\n3\r\n19\n\r\n3\r\n20\n\r\n3\r\n21\n\r\n3\r\n22\n\r\n3\r\n23\n\r\n3\r\n24\n\r\n3\r\n25\n\r\n3\r\n26\n\r\n3\r\n27\n\r\n3\r\n28\n\r\n0\r\n\r\n". Buffer size=164, contents: "2\r\n9\n\r\n3\r\n10\n\r\n3\r\n11\n\r\n3\r\n12\n\r\n3\r\n13\n\r\n3\r\n14\n\r\n3\r\n15\n\r\n3\r\n16\n\r\n3\r\n17\n\r\n3\r\n18\n\r\n3\r\n19\n\r\n3\r\n20\n\r\n3\r\n21\n\r\n3\r\n22\n\r\n3\r\n23\n\r\n3\r\n24\n\r\n3\r\n25\n\r\n3\r\n26\n\r\n3\r\n27\n\r\n3\r\n28\n\r\n0\r\n\r\n"

I tried a dirty hack to always close the clientconn when cleaning up the response.bodyStream but not sure if there is a better way of doing it and not really sure if this is a bug or I misuse it?

TAYTS commented 7 months ago

Propose solution:

resp.bodyStream = newCloseReader(rbs, func() error {
    hc.releaseReader(br)
    if r, ok := rbs.(*requestStream); ok {
        releaseRequestStream(r)
    }
    if closeConn || resp.ConnectionClose() {
        hc.closeConn(cc)
    } else {
        hc.releaseConn(cc)
    }
        // if the connection last use time is greater than deadline, close the connection
        if cc.lastUseTime.After(deadline) {
               hc.closeConn(cc)
        }
    return nil
})
mdenushev commented 7 months ago

Same, example python upstream:

from gevent import monkey; monkey.patch_all()
import gevent

from bottle import route, run

i = 0

@route('/stream')
def stream():
    global i
    i += 1
    yield str(i) + 'part1' + '.' * 30000
    gevent.sleep(60)
    yield str(i) + 'part2' + '.' * 30000
    gevent.sleep(60)
    yield str(i) + 'part3' + '.' * 30000

run(host='0.0.0.0', port=8080, server='gevent')

Looks like when we re-use clientConn after timeout while writing response body stream, next request on same conn can read bytes from first response body. Example with additional logs:

got conn 127.0.0.1:58268
buffered: 0
<nil>
write body stream before close err: read tcp4 127.0.0.1:58268->127.0.0.1:3002: i/o timeout
2024-04-11 16:45:20.005816 +0300 MSK m=+3.055472251 released conn
2024/04/11 16:45:20 error when serving connection "127.0.0.1:9001"<->"127.0.0.1:58267": read tcp4 127.0.0.1:58268->127.0.0.1:3002: i/o timeout
got conn 127.0.0.1:58268
got buffer starting with buffered count 0
got buffer after reset with buffered count 0
buffered: 0
error when reading response headers: cannot find whitespace in the first line of response "7536\r\n1part1.................
makasim commented 1 month ago

I was able to catch the same error on fasthttp v1.55.0. It is kind of api gateway where the server is std http server and the upstream client is fasthttp (I am trying to migrate on it). I am trying to repo the issue but without luck so far. Any hints or ideas on how to repo would be much appreciated.

For starters, I am posting snippets of the code where I got the error. I hope it helps.

new client:

func NewFastClient() *fasthttp.Client {
    return &fasthttp.Client{
        NoDefaultUserAgentHeader: true,
        MaxConnsPerHost:          50000,
        MaxIdleConnDuration:      time.Second * 50,
        ReadTimeout:              time.Second * 50,
        WriteTimeout:             time.Second * 50,
        StreamResponseBody:       true,
    }
}

serve (stripped version):

fReq := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(fReq)

fReq.Header.DisableSpecialHeader()
fReq.SetRequestURI(targetURL)
fReq.SetBody(ctx.RequestBody)
fReq.Header.SetMethod(method)

// set X-Forwarded-Host to public host name
fReq.Header.Set("X-Forwarded-Host", publicHost)
fReq.Header.Set("Connection", "keep-alive")
fReq.Header.Set("Host", upstreamHost)
fReq.Header.Add("Via", requestProto+" api-gateway")

fResp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(fResp)

fResp.StreamBody = true

if err := h.httpClient.DoTimeout(fReq, fResp, time.Second*50); isTimeout(err) {
  http.Error(downResp, `gateway timeout`, http.StatusGatewayTimeout)
  return
} else if err != nil {
  l.Error("http client: do failed", logger.Error(err), "err_code", errCode)

  //Here I caught the error:
  // error when reading response headers: cannot find whitespace in the first line of response
  http.Error(downResp, `bad gateway`, http.StatusBadGateway)
  return
}
defer func() {
  if err := fResp.CloseBodyStream(); err != nil {
    l.Error("failed to close response body", logger.Error(err))
  }
}()

fResp.Header.VisitAll(func(key, value []byte) {
  downResp.Header().Add(string(key), string(value))
})
downResp.WriteHeader(fResp.StatusCode())

if stream := resp.BodyStream(); stream != nil {
  if _, err := io.Copy(downResp.Body, resp.BodyStream()); err != nil {
    return fmt.Errorf("write response: %w", err)
  }
}