influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.69k stars 5.59k forks source link

EC2 Metadata Processor optional caching for immutable fields #12881

Closed caphrim007 closed 1 year ago

caphrim007 commented 1 year ago

Use Case

The current design of the EC2 Metadata Processor can result in a significant performance hit on telegraf collectors which consume a large number of metrics.

For example, in an instance which we run, Telegraf is consuming ~6000 metrics every 30 seconds. With the AWS Processor enabled and fetching only the region field, this results in a lag of more than 30 seconds.

After speaking with an AWS TAM, we identified several scenarios which could lead to a boost in performance.

Scenario 1 : Instance Restart

During tests none of the following attributes changed other than the pendingTime

{
  "accountId" : "xxxx",
  "architecture" : "x86_64",
  "availabilityZone" : "us-east-1b",
  "billingProducts" : null,
  "devpayProductCodes" : null,
  "marketplaceProductCodes" : null,
  "imageId" : "ami-0557a15b87f6559cf",
  "instanceId" : "i-1ba8eec0ead20f6ac0",
  "instanceType" : "t2.micro",
  "kernelId" : null,
  "pendingTime" : "2023-03-06T16:49:23Z",
  "privateIp" : "172.31.21.208",
  "ramdiskId" : null,
  "region" : "us-east-1",
  "version" : "2017-09-30"
}

Scenario 2 : Instance Stop/Start

When doing a Stop/Start there will be change in pendingTime and also I was checking other links under instance-identity and notice that http://169.254.169.254/latest/dynamic/instance-identity/signature got modified as well. Rest looks the same.

Scenario 3 : While resizing instance

When modifying the instance type, under the /document path, value for instanceType changes along with the pendingTime and http://169.254.169.254/latest/dynamic/instance-identity/signature. Everything remains the same

So looking at the above details, it seems like there are a couple of parameters that can change during a stop/start or resize of the instance. Under all these scenarios instance id remain the same.

If the user does not do any stop/start/reboot or resize, it seems like all values remain the same throughout the lifecycle(till termination) of the instance.

The current list of allowed IMDS Tags in Telegraf is

    "accountId":        {},
    "architecture":     {},
    "availabilityZone": {},
    "billingProducts":  {},
    "imageId":          {},
    "instanceId":       {},
    "instanceType":     {},
    "kernelId":         {},
    "pendingTime":      {},
    "privateIp":        {},
    "ramdiskId":        {},
    "region":           {},
    "version":          {},

Of these, the following IMDS tags are known to change give the above scenarios

The described, signature value is not a valid IMDS Tag in telegraf.

The proposal here is to allow some/all of these values to be cached after initial lookup to prevent rate limiting errors such as

2023-03-16T01:24:32Z E! [processors.aws_ec2] Error when calling GetInstanceIdentityDocument: operation error ec2imds: GetInstanceIdentityDocument, failed to get rate limit token, retry quota exceeded, 7 available, 10 requested

The default value of 10 is sufficient to suppress this error, but the tradeoff is an unusable processor due to too small of a parallel call size. When the processor errors, tags are not added; which defeats the purpose of the processor.

Expected behavior

The processor does not need to continually lookup data which is does not frequently change. Data can be cached either temporarily (to address cases where the metadata values do change) or permanently (for cases where it can't change).

The behavior of Telegraf would continue to be execution of processor calls in the millisecond range. Presently it takes upons of half a minute or more (which causes backpressur and upstream services to fail).

Actual behavior

On a system processing 6k+ metrics every 30 seconds, we observe 30s+ of lag which causes systems upstream of telegraf to fail (Pixie sending OTEL metrics for example). When we exclude the processor, latency returns to ~30ms. When we increase parallel calls, we encounter AWS rate limiting (which then causes tags to be lost)

Additional info

No response

powersj commented 1 year ago

Hi,

Thanks for the detailed research.

Can you help me understand what your proposal to cache the results looks like? If I am reading this correctly, we get all the imds_tags tags with one call. Would we add an option to only refresh tags periodically?

Thanks

caphrim007 commented 1 year ago

hi @powersj ,

The processor presently collects from two sources; imds and ec2 describe. Caching would be useful for either subset of features, but I focused specifically on the IMDS data because 1. I use it, and 2. It's less mutable for the fields that are looked up.

Telegraf does get all of the tags with 1 call, but because of the Add() definition here https://github.com/influxdata/telegraf/blob/master/plugins/processors/aws/ec2/ec2.go#L70, these are enqueued for processing. One for each metric.

When we were experimenting with caching changes, we added a lookup similar to what was done in the reverse_dns plugin. For example,

func (r *AwsIMDSProcessor) Lookup(tag string) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout))
    defer cancel()

    // check if the value is cached
    r.rwLock.RLock()
    result, found := r.cache.Get(tag)
    if found {
        defer r.rwLock.RUnlock()
        // cache is valid
        return result.(string), nil
    }
    r.rwLock.RUnlock()

    r.Log.Infof("Cache miss for tag: %s", tag)

    r.rwLock.Lock()
    defer r.rwLock.Unlock()
    iido, err := r.imdsClient.GetInstanceIdentityDocument(
        ctx,
        &imds.GetInstanceIdentityDocumentInput{},
    )
    if err != nil {
        return "", err
    }
    v := getTagFromInstanceIdentityDocument(iido, tag)
    if v != "" {
        r.cache.Set(tag, v, cache.DefaultExpiration)
    }
    return v, nil
}

and then called that Lookup function

func (r *AwsIMDSProcessor) asyncAdd(metric telegraf.Metric) []telegraf.Metric {
    if len(r.imdsTagsMap) > 0 {
        for tag := range r.imdsTagsMap {
            result, err := r.Lookup(tag)
            if err != nil {
                r.Log.Errorf("Error when looking up: %v", err)
                continue
            }
            metric.AddTag(tag, result)
        }
    }

    return []telegraf.Metric{metric}
}

We tolerated the risk that pendingTime and instanceType could change because we used neither.

Refreshing periodically would be a reasonable option imho. If this is a configurable value, that would probably suite our needs. For our environment specifically, we wouldn't refresh more than every 24 hours or so. If it were configurable by minutes, with the default behavior remaining the same (never cache) then people could tune this for their environment and change the refresh rate as needed.

Hope that helps.

powersj commented 1 year ago

Thanks for the description of what you did. Would you be willing to put this up in a PR? As you have a way to test this it would be very helpful.

caphrim007 commented 1 year ago

happy to submit a PR, but would like guidance on how the team would want the implementation to look.

do we want a global timer that is configurable via a minutes-like argument? what would you like the new property called? should it apply to both ec2 tags as well as imds? can I re-use the pattern that is in the reverse_dns plugin?

powersj commented 1 year ago

happy to submit a PR, but would like guidance on how the team would want the implementation to look.

awesome :)

do we want a global timer that is configurable via a minutes-like argument? what would you like the new property called? should it apply to both ec2 tags as well as imds? can I re-use the pattern that is in the reverse_dns plugin?

I like your thinking about reusing the experience of the reverse_dns's cache_ttl. Let's call it the same, with a config.Duration, and default to 0h which is disabled.

I do think it should apply to both imds and ec2 tags as well.

Thanks again!

Skyrail commented 1 year ago

We're also encountering a similar problem to this with EC2 tags and the tail input plugin. As we're monitoring access logs, as traffic increases to our instances the error rate increases on the DescribeTags rate limit errors.

A cache timeout configuration of sorts would be perfect as our tags only change on deployment, at which point it's a whole new EC2 instance anyway.

If I can help in anyway, testing or otherwise do let me know as I'm keen to get Telegraf working properly in our prod env and at the moment it's messing with our tags and thus our data.

caphrim007 commented 1 year ago

@powersj i have an image that i'm running locally to test with. had a question.

during development I needed to see if things were working, so I added a go-routine to emit to stdout cache hit/miss/size/full statistics every 30 or so seconds. It logs, then sleeps, and repeats indefinitely.

would you want me to include this in the PR? or exclude it? it was useful for me to see that it was working. might also be useful for operators using the processor to tell that caching is working. but lemme know.

powersj commented 1 year ago

@caphrim007 I agree that could be helpful for others, but I would exclude it by default. You could either a) have it print under debug b) add an option to the processor to enable/disable statistics.

caphrim007 commented 1 year ago

@powersj is there a globally available debug flag that trickles down to the processors? is it this?

telegraf.Debug

or is the more common pattern the latter option?

powersj commented 1 year ago

globally available debug flag that trickles down to the processors

There is not, so if we tried to only print during debug mode, you would always have to kick off the go routine and let the logger take care of printing or not.

I think that means the option makes more sense.

caphrim007 commented 1 year ago

odd. I tried compiling and using that telegraf.Debug thing and I didn't get any errors.

ex.

if telegraf.Debug {
    go r.logCacheStatistics()
}

I found I could control its on/off-ness with

[agent]
debug = true/false

Or at least that's what I appeared to see when examining the stdout from it. Was it behaving differently than I saw?

powersj commented 1 year ago

odd. I tried compiling and using that telegraf.Debug thing and I didn't get any errors.

oh I didn't realize we actually export it - @srebhan thoughts on using this in a plugin?

caphrim007 commented 1 year ago

note, I saw its usage in the groundwork.go and exec.go output plugins, which is what prompted me to try it in the processor to see if I would get an error or not.

caphrim007 commented 1 year ago

@powersj since I havent heard back from @srebhan and I dont see the telegraf.Debug used widely across other plugins, I am going to add another argument called log_cache_stats. lemme know if that name is ok.

powersj commented 1 year ago

@powersj since I havent heard back from @srebhan

fwiw he has had a long weekend due to holidays :)

log_cache_stats. lemme know if that name is ok.

sounds good, thanks

caphrim007 commented 1 year ago

@powersj i noticed the config.Duration type is a value in nanos. does telegraf have a function to convert that back to seconds?

powersj commented 1 year ago

How about:

time.Duration(duration).Seconds()