census-instrumentation / opencensus-go

A stats collection and distributed tracing framework
http://opencensus.io
Apache License 2.0
2.05k stars 326 forks source link

trackingResponseWriter does not measure payloads served using ReadFrom #1241

Open FilipVozar opened 3 years ago

FilipVozar commented 3 years ago

What version of OpenCensus are you using?

v0.22.4

What version of Go are you using?

1.14

What did you do?

Use ochttp.Handler to measure server response size. Use http.ServeContent method to serve content.

What did you expect to see?

I expected that server response body size would be measured in the ochttp.ServerResponseBytes stat.

What did you see instead?

Response body size is not measured (metric value stays at 0).

Additional context

http.ServeContent uses io.Copy, passing it io.LimitedReader as the source. This triggers the code path in which io.Copy uses ReadFrom(io.Reader) to do the copy. When wrapping the http.ResponseWriter, ochttp.trackingResponseWriter uses the underlying http.ResponseWriter as io.ReaderFrom. This means that responses served by the ReadFrom(io.Reader) are not measured, because the call to ReadFrom(io.Reader) bypasses ochttp.trackingResponseWriter.

Program to reproduce below. It also shows that implementing io.ReaderFrom fixes the issue (naive implementation).

package main

import (
    "context"
    "fmt"
    "io"
    "net/http"
    "net/http/httptest"
    "strings"
    "time"

    "contrib.go.opencensus.io/exporter/prometheus"
    stdprometheus "github.com/prometheus/client_golang/prometheus"
    "go.opencensus.io/plugin/ochttp"
    "go.opencensus.io/stats"
    "go.opencensus.io/stats/view"
)

func main() {
    setupRoutes()
    srv := httptest.NewServer(&ochttp.Handler{})
    defer srv.Close()

    registry := setupRegistry()

    sizeBefore := 0
    for _, endpoint := range []string{"/simple", "/broken", "/fixed"} {
        if _, err := http.Get(srv.URL + endpoint); err != nil {
            panic(err)
        }
        sizeAfter := measuredSize(registry)
        fmt.Printf("endpoint: %s, before: %d, after: %d\n", endpoint, sizeBefore, sizeAfter)
        sizeBefore = sizeAfter
    }
}

func setupRoutes() {
    http.HandleFunc("/simple", func(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte("x"))
    })
    http.HandleFunc("/broken", func(w http.ResponseWriter, r *http.Request) {
        src := strings.NewReader("x")
        http.ServeContent(w, r, "", time.Time{}, src)
    })
    http.HandleFunc("/fixed", func(w http.ResponseWriter, r *http.Request) {
        src := strings.NewReader("x")
        // demonstrate fixing the issue by implementing io.ReaderFrom
        fixed := withReadFrom{
            Context:        r.Context(),
            ResponseWriter: w,
        }
        http.ServeContent(fixed, r, "", time.Time{}, src)
    })
}

type withReadFrom struct {
    context.Context
    http.ResponseWriter
}

func (f withReadFrom) ReadFrom(src io.Reader) (int64, error) {
    size, err := f.ResponseWriter.(io.ReaderFrom).ReadFrom(src)
    stats.Record(f, ochttp.ServerResponseBytes.M(size))
    return size, err
}

func setupRegistry() *stdprometheus.Registry {
    reg := stdprometheus.NewRegistry()
    pc, err := prometheus.NewExporter(prometheus.Options{
        Registry: reg,
    })
    if err != nil {
        panic(err)
    }
    view.RegisterExporter(pc)
    if err := view.Register(ochttp.ServerResponseBytesView); err != nil {
        panic(err)
    }
    return reg
}

func measuredSize(reg *stdprometheus.Registry) int {
    metrics, err := reg.Gather()
    if err != nil {
        panic(err)
    }

    for _, m := range metrics {
        if *m.Name == "opencensus_io_http_server_response_bytes" {
            return int(*m.Metric[0].Histogram.SampleSum)
        }
    }
    panic("metric not found")
}

output:

endpoint: /simple, before: 0, after: 1
endpoint: /broken, before: 1, after: 1
endpoint: /fixed, before: 1, after: 2