elastic / go-elasticsearch

The official Go client for Elasticsearch
https://github.com/elastic/go-elasticsearch#go-elasticsearch
Apache License 2.0
50 stars 617 forks source link

How to let then client reuse http connection? #781

Closed RyanChen1997 closed 10 months ago

RyanChen1997 commented 10 months ago

Elasticsearch version: V8.11.0 go-elasticsearch/v8

I have successfully set up Elasticsearch on my local machine using Docker. To create a client, I have set the MaxIdleConnsPerHost parameter to 10. Now, I am indexing documents using the bulk API. Since there are a large number of documents, it may take some time to complete the process.

func NewClientV8(ctx context.Context) *elasticsearch.Client {
    cert, err := ioutil.ReadFile("http_ca.crt")
    if err != nil {
        log.Fatalf("Failed to load certificate: %v", err)
    }

    certPool := x509.NewCertPool()
    certPool.AppendCertsFromPEM(cert)

    cfg := elasticsearch.Config{
        Addresses: []string{
            "https://localhost:9200",
        },
        Username: "elastic",
        Password: "bghCDHWKJknLrkr+8ZMF",
        Transport: &http.Transport{
            MaxIdleConnsPerHost: 20,
            TLSClientConfig: &tls.Config{
                RootCAs: certPool,
            },
        },
    }

    es, err := elasticsearch.NewClient(cfg)
    if err != nil {
        log.Fatalf("Error creating the client: %s", err)
        return es
    }

    res, err := es.Info()
    if err != nil {
        log.Fatalf("Error getting response: %s", err)
    }
    defer res.Body.Close()

    log.Println(res)
    return es
}

func (*ES8) Bulk(ctx context.Context, docs []BulkRequest) error {

    bulkRequestBody := strings.Builder{}
    for _, doc := range docs {
        metaData := fmt.Sprintf(`{ "index" : { "_index" : "%s", "_id" : "%s" } }%s`, index, doc.Id, "\n")
        docData, err := json.Marshal(doc.Doc)
        if err != nil {
            log.Printf("Error marshaling document: %v", err)
            continue
        }
        bulkRequestBody.WriteString(metaData)
        bulkRequestBody.Write(docData)
        bulkRequestBody.WriteString("\n")
    }

    // Send the bulk request
    req := esapi.BulkRequest{
        Index:   index,
        Body:    strings.NewReader(bulkRequestBody.String()),
        Refresh: "true",
        Pretty:  true,
    }
    res, err := req.Do(context.Background(), es)
    if err != nil {
        log.Fatalf("Error performing bulk request: %s", err)
        return err
    }
    defer res.Body.Close()

    // if res.IsError() {
    //  log.Printf("Error[%s]: %s", res.Status(), res.String())
    // } else {
    //  log.Println("Bulk insert successful", res.String())
    // }
    return nil
}

Initially, it worked perfectly fine. However, at the end, I encountered an error message: Error performing bulk request: read tcp 127.0.0.1:50989->127.0.0.1:9200: read: connection reset by peer. And i found the http connections is increasing at the process.

image

Does the client reuse the http connections default? If it does not, how could let it reuse? Thanks for your reply!

RyanChen1997 commented 10 months ago

I have found another similar issue https://github.com/elastic/go-elasticsearch/issues/123. The root cause is that i close the response's body but not read body content. I added one line code res.String() and the problem is resolved. I will close the issue.