influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.63k stars 5.58k forks source link

outputs.elasticsearch: Not dropping bad metrics after failure #14110

Closed Livel-tempest closed 11 months ago

Livel-tempest commented 1 year ago

Relevant telegraf.conf

[[outputs.elasticsearch]]
  urls = ["http://elasticsearch:10086"]
  timeout = 5
  enable_sniffer = false
  index_name = "region-00000000000000000000000000000000@mcelog_log"
  enable_gzip = true
  health_check_interval = 0
  [outputs.elasticsearch.tagpass]
      index = ["mcelog_log"]

Logs from Telegraf

Oct 15 18:20:25 pve telegraf[1560016]: 2023-10-15T16:20:25Z D! [outputs.elasticsearch] elasticsearch failed to index 2 metrics

System info

Telegraf 1.28.1 (git: HEAD@3ea9ffbe), Debian GNU/Linux 11 (bullseye), Proxmox

Docker

No response

Steps to reproduce

  1. The es output plugin writes data into es in batches.
  2. Part of the data write to es failed ...

Expected behavior

Only rewrite the data that failed to write es in the current batch

Actual behavior

all the data in the current batch will be rewritten.

Additional info

The es output plugin writes indicators into es in batches. When part of the data fails to be written, all the data in the current batch will be rewritten, this will eventually lead to duplicate data in es.

powersj commented 1 year ago

Hi,

Looking at the current code, in the event that we get a response back from the server when sending the request, we then take a look and check the response itself from errors. We currently return any errors we get, which will cause telegraf to buffer the metrics. And as a result, as you are currently seeing, never drop any "bad" metrics.

If I had to guess, the reason for this current logic is that there could be retryable errors received as well (e.g. 429 too many requests). Where a user would not want us to drop all the metrics, but instead retry the request. We do something like this in the influxdb output. However, in the Elasticsearch plugin we currently do not differentiate between retryable and non-retryable errors.

I have put up PR #14115 with a change to the error messages to include the status code. Could you please run with that PR, and provide the complete logs which show the status code you are receiving? With that in hand we could add some logic to not retry on certain error codes like we do in other places.

Thanks!

Livel-tempest commented 1 year ago

I ran with that PR as you said, and the logs I got are as follows: Oct 17 10:10:25 pve telegraf[1560016]: 2023-10-17T10:10:25Z D! [outputs.elasticsearch] Elasticsearch indexing failure, id: 0, status:429, error: rejected execution of primary operation [coordinating_and_primary_bytes=1685485309, replica_bytes=905748830, all_bytes=2591234139, primary_operation_bytes=1835748, max_coordinating_and_primary_bytes=2576980377], caused by: %!s(), %!s()

powersj commented 1 year ago

Looks like a 429 error, which usually indicates too many requests. This is generally an example of an error code where we would in fact want to buffer the metrics so they are not lost.

You have not shared your full logs, so it is difficult to understand what is going on at the time, but are you certain you are not hitting some limit?

Livel-tempest commented 1 year ago

When our system triggers full data synchronization, a large number of other requests will arrive in es. At this time, telegraf's es output plug-in, because it is a batch write operation, will appear in the same batch of data, and some data will be written successfully. Some data will fail to be written to es, with a status code of 429. Telegraf will reset all the data in this batch (regardless of whether it was successful in the previous batch write operation) back to the buffer, and then rewrite it.

Livel-tempest commented 1 year ago

telegraf/plugins/outputs/elasticsearch /elasticsearch.go

    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout))
    defer cancel()

    res, err := bulkRequest.Do(ctx)

    if err != nil {
        return fmt.Errorf("error sending bulk request to Elasticsearch: %w", err)
    }

    if res.Errors {
        for id, err := range res.Failed() {
            a.Log.Errorf(
                "Elasticsearch indexing failure, id: %d, error: %s, caused by: %s, %s",
                id,
                err.Error.Reason,
                err.Error.CausedBy["reason"],
                err.Error.CausedBy["type"],
            )
            break
        }
        return fmt.Errorf("elasticsearch failed to index %d metrics", len(res.Failed()))
    }
    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout))
    defer cancel()

    res, err := bulkRequest.Do(ctx)

    if err != nil {
        return fmt.Errorf("error sending bulk request to Elasticsearch: %w", err)
    }

    if res.Errors {
        for id, err := range res.Failed() {
            a.Log.Errorf(
                "Elasticsearch indexing failure, id: %d, error: %s, caused by: %s, %s",
                id,
                err.Error.Reason,
                err.Error.CausedBy["reason"],
                err.Error.CausedBy["type"],
            )
            break
        }
        return fmt.Errorf("elasticsearch failed to index %d metrics", len(res.Failed()))
    }

when bulk operate faied, here return a error

// Write writes all metrics to the output, stopping when all have been sent on
// or error.
func (r *RunningOutput) Write() error {
    if output, ok := r.Output.(octopus.AggregatingOutput); ok {
        r.aggMutex.Lock()
        metrics := output.Push()
        r.buffer.Add(metrics...)
        output.Reset()
        r.aggMutex.Unlock()
    }

    atomic.StoreInt64(&r.newMetricsCount, 0)

    // Only process the metrics in the buffer now.  Metrics added while we are
    // writing will be sent on the next call.
    nBuffer := r.buffer.Len()
    nBatches := nBuffer/r.MetricBatchSize + 1
    for i := 0; i < nBatches; i++ {
        batch := r.buffer.Batch(r.MetricBatchSize)
        if len(batch) == 0 {
            break
        }

        err := r.write(batch)
        if err != nil {
            r.buffer.Reject(batch)
            return err
        }
        r.buffer.Accept(batch)
    }
    return nil
}
// Reject returns the batch, acquired from Batch(), to the buffer and marks it
// as unsent.
func (b *Buffer) Reject(batch []octopus.Metric) {
    slogger.Debugf("reject len is %d", len(batch))
    b.Lock()
    defer b.Unlock()

    if len(batch) == 0 {
        return
    }

    free := b.cap - b.size
    restore := min(len(batch), free)
    skip := len(batch) - restore

    b.first = b.prevby(b.first, restore)
    b.size = min(b.size+restore, b.cap)

    re := b.first

    // Copy metrics from the batch back into the buffer
    for i := range batch {
        if i < skip {
            b.metricDropped(batch[i])
        } else {
            b.buf[re] = batch[i]
            re = b.next(re)
        }
    }

    b.resetBatch()
    b.BufferSize.Set(int64(b.length()))
}

r.buffer.Reject(batch) marks this batch of data as unsent, Regardless of whether some data in this batch of data write to es successfully, this batch of data will all be rewritten to es. I think this approach is unreasonable and we should only retry failed data in res.Failed

powersj commented 1 year ago

Telegraf will reset all the data in this batch (regardless of whether it was successful in the previous batch write operation) back to the buffer, and then rewrite it.

yes as I previously said this is expected behavior. a 429 is not a partial write error, it is too many writes were attempted.

Your original issue was that bad metrics are not dropped. Based on the very limited log lines you have provided, these metrics are not bad. Instead, your system is hammering your elasticsearch instance. As a result, you are getting too many request errors.

Nothing in the response that you have provided, demonstrates that there was a partial write and as such how is telegraf suppose to know what to do?

telegraf-tiger[bot] commented 11 months ago

Hello! I am closing this issue due to inactivity. I hope you were able to resolve your problem, if not please try posting this question in our Community Slack or Community Forums or provide additional details in this issue and reqeust that it be re-opened. Thank you!