cloudevents / sdk-go

Go SDK for CloudEvents
https://cloudevents.github.io/sdk-go/
Apache License 2.0
786 stars 217 forks source link

`req.Body` not reset in `doWithRetry` #773

Closed embano1 closed 2 years ago

embano1 commented 2 years ago

Scenario:

When using a net/http HTTP server as remote/receiver (instead of the receiver from this package), ceClient has issues when retrying a non-successful HTTP response.

Error: "msg":"Invalid result type, not HTTP Result: Post \"http://always-fail.default.127.0.0.1.sslip.io\": http: ContentLength=34 with Body length 0 (type: *protocol.Receipt)"}

This happens when the remote server returns a message in the response, e.g. „server down“ (not CE) but not when just returning a status code and empty response body.

The issue is caused by func (p *Protocol) doWithRetry() because it does not reset the req.Body during subsequent attempts.

Expected behavior:

Return encoding error, but correctly reset req.Body, e.g. return "failed to convert response into event: unknown Message encoding\n500: (3x)"

Client code:

/*
 Copyright 2021 The CloudEvents Authors
 SPDX-License-Identifier: Apache-2.0
*/

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"

    "github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
    "go.uber.org/zap"
    "knative.dev/eventing/pkg/kncloudevents"
    "knative.dev/pkg/logging"

    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/cloudevents/sdk-go/v2/protocol"
    cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

func main() {
    ctx := cloudevents.ContextWithTarget(context.Background(), "http://always-fail.default.127.0.0.1.sslip.io")

    topts := []cehttp.Option{cehttp.WithIsRetriableFunc(func(statusCode int) bool {
        retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: statusCode}, nil)
        return retry
    })}

    ceClient, err := client.NewClientHTTP(topts, nil)
    if err != nil {
        panic(err)
    }

    e := cloudevents.NewEvent()
    e.SetType("com.cloudevents.sample.sent")
    e.SetSource("https://github.com/cloudevents/sdk-go/v2/samples/httpb/sender")
    _ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
        "id":      1,
        "message": "Hello, World!",
    })

    ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, 10*time.Millisecond, 3)
    response, result := ceClient.Request(ctx, e)

    if !isSuccess(ctx, result) {
        logging.FromContext(ctx).Errorw("Failed to deliver", zap.Error(result))
        return
    }

    fmt.Printf("got: %v", response)
}

func isSuccess(ctx context.Context, result protocol.Result) bool {
    var retriesResult *cehttp.RetriesResult
    if cloudevents.ResultAs(result, &retriesResult) {
        var httpResult *cehttp.Result
        if cloudevents.ResultAs(retriesResult.Result, &httpResult) {
            retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: httpResult.StatusCode}, nil)
            return !retry
        }
        logging.FromContext(ctx).Warnf("Invalid result type, not HTTP Result: %v (type: %T)", retriesResult.Result, retriesResult.Result)
        return false
    }

    logging.FromContext(ctx).Warnf("Invalid result type, not RetriesResult")
    return false
}

HTTP server code (note: just using net/http)

package main

import (
    "context"
    "errors"
    "net/http"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
    "knative.dev/pkg/logging"
    "knative.dev/pkg/signals"
)

const (
    addr    = ":8080"
    timeout = 5 * time.Second
)

func main() {
    ctx := signals.NewContext()

    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Post("/", func(w http.ResponseWriter, r *http.Request) {
        // w.WriteHeader(http.StatusInternalServerError) <- just doing this works!
        http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
    })

    srv := http.Server{
        Addr:         addr,
        Handler:      r,
        ReadTimeout:  timeout,
        WriteTimeout: timeout,
    }

    go func() {
        <-ctx.Done()
        logging.FromContext(ctx).Info("shutting down")
        if err := srv.Shutdown(context.Background()); err != nil {
            logging.FromContext(ctx).Warnf("could not shutdown: %v", err)
        }
    }()

    logging.FromContext(ctx).Infow("listening", "address", addr)
    if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
        logging.FromContext(ctx).Fatal(err)
    }
}

Patch

diff --git a/v2/protocol/http/protocol_retry.go b/v2/protocol/http/protocol_retry.go
index fb7bcd2..9050469 100644
--- a/v2/protocol/http/protocol_retry.go
+++ b/v2/protocol/http/protocol_retry.go
@@ -6,8 +6,10 @@
 package http

 import (
+   "bytes"
    "context"
    "errors"
+   "io"
    "net/http"
    "net/url"
    "time"
@@ -53,6 +55,17 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam
    retry := 0
    results := make([]protocol.Result, 0)

+   var (
+       body []byte
+       err  error
+   )
+   if req.Body != nil {
+       body, err = io.ReadAll(req.Body)
+       if err != nil {
+           panic(err)
+       }
+   }
+
    for {
        msg, result := p.doOnce(req)

@@ -90,6 +103,13 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam
        }

    DoBackoff:
+       if req.Body != nil {
+           req.Body = io.NopCloser(bytes.NewReader(body))
+           req.GetBody = func() (io.ReadCloser, error) {
+               return io.NopCloser(bytes.NewReader(body)), nil
+           }
+       }
+
        // Wait for the correct amount of backoff time.

        // total tries = retry + 1
n3wscott commented 2 years ago

nice! Want to make a PR?

embano1 commented 2 years ago

nice! Want to make a PR?

Sure, I just need to dig a little deeper as there might be another anomaly in the way requests are handled. Not sure I get the full picture yet.

embano1 commented 2 years ago

Update:

I can reproduce the issue with net/http the following handlers:

func(w http.ResponseWriter, r *http.Request) {
        http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
}
func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusInternalServerError)
}

The second handler does not always show the http: ContentLength=34 with Body length 0 *url.Error, but I managed to catch it several times with a debugger.

I was also able to catch it once using the cloudevents receiver and a simple event handler:

func receive(ctx context.Context, event cloudevents.Event) error {
    fmt.Printf("got event: %v\n", event)
    return http.NewResult(nethttp.StatusInternalServerError, nethttp.StatusText(nethttp.StatusInternalServerError))
}

Still not sure where and how it gets triggered in all these (rare) scenarios, but perhaps there's a race which only shows during debugging - typical Heisenbug :D

Also, I was wondering why the unit tests were not exhibiting the problem: this is caused by only using a RoundTripper mock instead of a "full" HTTP server which reads/modifies the body (also the tests didn't use a body which was another reason we did not see this).