opensearch-project / opensearch-go

Go Client for OpenSearch
https://opensearch.org/docs/latest/clients/go/
Apache License 2.0
188 stars 96 forks source link

[BUG] Error http: ContentLength=401 with Body length 0 #541

Closed merlinz01 closed 1 month ago

merlinz01 commented 1 month ago

What is the bug?

I'm getting lots of these errors when indexing documents into a data stream: http: ContentLength=401 with Body length 0

How can one reproduce the bug?

Here's my code:

func (logger *MyLogger) SendLogEvent(event *MyLogEvent) error {
    // Marshal the struct
    s, err := json.Marshal(event)
    if err != nil {
        return WrapError("failed to marshal log event", err)
    }
    // Do the request
    req, err := opensearchapi.IndexReq{
        Index: logger.logsIndexName,
        Body: bytes.NewReader(s),
    }.GetRequest()
    if err != nil {
        return WrapError("failed to create OpenSearch index request", err)
    }
    resp, err := logger.client.Perform(req)
    // Make sure to close the body so the connection can be reused
    if resp != nil && resp.Body != nil {
        defer resp.Body.Close()
    }
    if resp == nil {
        return err
    }
    if err != nil || resp.StatusCode != 201 {
        // Log the error
        return errors.New("failed to log event, statuscode " + resp.Status)
    }
    // Still read the body so the connection can be reused
    io.Copy(io.Discard, resp.Body)
    // Success
    return nil
}

What is the expected behavior?

No error.

What is your host/environment?

Debian 12 and Linux Mint 21.3 opensearch-go v4.0.0

Do you have any screenshots?

Do you have any additional context?

Jakob3xD commented 1 month ago

Hey, thanks for opening the issue. The use of client.Perform() is not recommended as it skips all the error handling. The correct way would be following:

func (logger *MyLogger) SendLogEvent(event *MyLogEvent) error {
    // Marshal the struct
    s, err := json.Marshal(event)
    if err != nil {
        return WrapError("failed to marshal log event", err)
    }

    // Do the request
    _, err := logger.client.Index(
        context.Background(),
        opensearchapi.IndexReq{
            Index: logger.logsIndexName,
            Body: bytes.NewReader(s),
        },
    )
    if err != nil {
        return err
    }
    // Success
    return nil
}
merlinz01 commented 1 month ago

OK, this morning I'm not getting the errors. We had terrible connectivity yesterday and I think that was causing it. (I think I had changed the settings, see my next comment)

I found where the error was being raised: /usr/local/go/src/net/http/transfer.go line 391

                        ncopy, err = t.doBodyCopy(w, io.LimitReader(body, t.ContentLength))
            if err != nil {
                return err
            }
            var nextra int64
            nextra, err = t.doBodyCopy(io.Discard, body)
            ncopy += nextra
        }
        if err != nil {
            return err
        }
    }
...
    if !t.ResponseToHEAD && t.ContentLength != -1 && t.ContentLength != ncopy {
        return fmt.Errorf("http: ContentLength=%d with Body length %d",
            t.ContentLength, ncopy)
    }

Seems like not all of the request body was being written yet there were no I/O errors.

Thanks for the info on how to use the API. I have updated my code.

dblock commented 1 month ago

@merlinz01 Glad this was helpful. Should we close this?

merlinz01 commented 1 month ago

Now I'm getting some of these errors again even though the connectivity has much improved. So far they only show up when I set CompressRequestBody to true. I wonder if there is a bug in the gzipping of the requests.

merlinz01 commented 1 month ago

OK, I'm getting it. So when it retries a failed attempt, the second time the request body is an empty bytes.Buffer reader. It seems like it tries to re-read the body on the second attempt, but as it was fully read the first time, the result is zero bytes. It passes this zero-byte buffer to the writer, which writes zero bytes and succeeds, and wonders why only zero bytes were written when the indicated Content-Length is XXX bytes.

https://github.com/opensearch-project/opensearch-go/blob/ea3e57ab09376988939f0564e6e7176ddb2cd8b8/opensearchtransport/opensearchtransport.go#L239-L268

On line 248, an io.nopCloser for the exact same (possibly already consumed) bytes.Buffer is returned every time req.GetBody is called.

merlinz01 commented 1 month ago

Here's how I fixed it:

                        req.GetBody = func() (io.ReadCloser, error) {
                // We have to return a new reader each time so that retries don't read from an already-consumed body.
                                // This does not do any copying of the data.
                return io.NopCloser(bytes.NewReader(buf.Bytes())), nil
            }

The code for non-gzipped bodies (line 261) does a similar thing it by implicitly copying the buf by value before its internal state changes. I would say it is less clear what is happening so maybe we should update it to be similar to this fix.

I can open a PR if you want.

dblock commented 1 month ago

I can open a PR if you want.

Most definitely, with tests please.

I think ideally the reader would be rewound only if retry is needed.

merlinz01 commented 1 month ago

We could implicitly copy the buffer like is done for non-gzipped bodies but IMHO it's better to be explicit. The second method below only makes two small struct allocations so it's not really a performance question.

                        req.GetBody = func() (io.ReadCloser, error) {
                b := *buf
                return io.NopCloser(&b), nil
            }

vs.

            req.GetBody = func() (io.ReadCloser, error) {
                reader := bytes.NewReader(buf.Bytes())
                return io.NopCloser(reader), nil
            }
merlinz01 commented 1 month ago

See #543. (#542 can be ignored; I committed to the wrong branch.)