influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.62k stars 5.57k forks source link

Output buffer persistence #802

Open kostasb opened 8 years ago

kostasb commented 8 years ago

In order to avoid dropping data from the output buffer because of telegraf service restart / extended connectivity loss with consumers / any other unexpected incident, there should be an option to enable persistence of the output buffer on disk.

Enabling such a feature will introduce I/O dependencies for Telegraf, so it should be optional and most probably disabled by default. Persistence should be enabled on a per output plugin basis, depending on whether dropping data is critical or not.

Proposed config file sample:

[agent] max_buffer_limit = 1000

[[outputs.influxdb]] ... persist_buffer = true

[[outputs.graphite]] ... persist_buffer = false

@sparrc thoughts?

joezhoujinjing commented 7 years ago

Is there a plan for this feature? @sparrc

sparrc commented 7 years ago

nope, sorry, it will be assigned a milestone when there is

biker73 commented 7 years ago

I use Kafka for this and then use telegraf to read / write to it. Kafka is great as a store to persist data to and making that available for others and also to set custom retention policies on topics. As kafka is 'free' under the Apache License why re-write another excellent solution already exists. Telegraf supports both input and output to Kafka and Kafka is a very versatile / scaleable product for this kind of purpose .

bfgoodrich commented 7 years ago

Kafka still doesn't solve the problem when Kafka itself becomes an issue. There should be some way to enable on-disk persistence (with some limit) so that data isn't lost in the event that an output becomes temporarily unavailable.

kt97679 commented 6 years ago

+1 for persistent buffers, this is a very useful feature.

voiprodrigo commented 6 years ago

Elastic just added this capability to Beats. https://github.com/elastic/beats/pull/6581

Just noting here as maybe parts of their implementation could be useful.

voiprodrigo commented 6 years ago

Anything planned for this?

Jaeyo commented 6 years ago

+1

voiprodrigo commented 5 years ago

Maybe for 2.0? :)

danielnelson commented 5 years ago

Maybe, this is not high priority right now and requires a substantial amount of design work. One aspect that has changed is that Telegraf will now only acknowledge messages from a queue after it they have been processed (sent from all outputs or filtered), it should be possible to use a queue to transfer messages durably with Telegraf.

PWSys commented 4 years ago

Any suggestions on picking a simple single instance message queue?

markusr commented 4 years ago

@PWSys: I shortly did some tests with the following setup: Data --> telegraf --> RabbitMQ --> telegraf --> influxdb using the AMQP input and output plugin.

It worked but I decided not use it because it adds too much complexity. Since all your data is stored in RabbitMQ you need to configure and operate it properly. This was quite a challenge for me since I never used RabbitMQ before. Maybe you have more experience with it.

See RabbitMQ config and persistence.

PWSys commented 4 years ago

@markusr Thanks for the info!

I was also looking at this, but instead with a single instance of Kafka. It can be deployed fairly simply as a container, but like you, I question the complexity. Ultimately, whether or not it will decrease the system resiliency.

darinfisher commented 4 years ago

Taken care of by #4938

ssoroka commented 4 years ago

Hey, I'm going to reopen this because I don't think #4938 addresses this issue. Slow outputs cause metrics to be dropped without blocking inputs. This ticket is asking for metric durability for outputs.

This request isn't unreasonable, it just hasn't been a high priority. It might be helpful to take a minute to summarize my thoughts on this, some of the concerns around how to address it, and what should be kept in mind when addressing it. I guess you all could use an update after 4 years.

Telegraf currently tries its best to balance keeping up with the metric flow and storing metrics that haven't been written to an output yet, up to the metric_buffer_limit; past that limit, we intentionally drop metrics to avoid OOM scenarios. At some point, it's reasonable to think Telegraf will never catch up and it should just do its best from that point on.

A review of the concerns at play:

It's not entirely easy to weave durability into that. There's a few potential options for what to implement:

  1. best-scenario "durability": on shutdown Telegraf saves the output buffers to disk before quitting. This isn't real durability, but it might be what some users want.
  2. real output durability: Telegraf writes all output-buffered messages to disk (but no durability for non-output messages). One could imagine non-trivial cost and non-trivial implementation here.
  3. real full-telegraf-stack durability: Telegraf writes all incoming messages to disk, all transformations to disk, and only removes them after it's sure they're accepted downstream, forcing backpressure everywhere to ensure it doesn't over-consume metrics in flight.

This issue describes # 2. I don't think # 3 is generally all that useful for metric data, and I can't help thinking that # 1 will cause more problems than it solves.

mrdavidaylward commented 4 years ago

A buffer to disk much like persistent Que's on logstash would be great. I run an ISP and when a tower goes down I heavily rely on backfill on my Backbone dishes to save the day the issue is when the downtime is too long I run out of memory or lose metrics.

I think When the metric Buffer in memory is full there should be a disk metric buffer option and only after the in-memory buffer is full then it starts writing to disk overflow to disk, I think having this intelligence on writing to memory till that buffer is hit can help avoid disk-related slowdowns or thrashing of emmc in the case of my setup.

Looking back in the thread this does look like a feature people are looking for.

ssoroka commented 4 years ago

I think there's a balance that could be struck here: best-effort storing of metrics that don't fit in the buffer, maybe with some kind of modified tail to read in the records from disk. inputs.tail has backpressure built into it, so it will naturally not get ahead of itself (it will avoid consuming too much and avoid dropping metrics).

based on that, a potential solution could be:

Will think this over and run it past the team.

russorat commented 4 years ago

Connecting this issue: https://github.com/influxdata/telegraf/issues/2679 When something changes for an output that requires a config reload, maybe on SIGHUP, buffers are written to disk, then immediately processed with the new config. Maybe there is a new config option for this?

in addition to path, you might need some of the other options from the file output for limiting size and/or rotation. https://github.com/influxdata/telegraf/tree/master/plugins/outputs/file#configuration

Is the behavior to store metrics in a memory queue and "flush" those metrics to disk once the limit has been hit? Then continue filling the in memory queue again? When the connection is restored, the process is reversed until all files are processed? File(s) would be processed and removed once it is confirmed a successful response from the output.

I assume there would be one file per output plugin, similar to one buffer per output we have today? and some naming convention for duplicate output configs (two influxdb outputs for example).

ssoroka commented 4 years ago

@darinfisher This sounds like it's overloaded, and wouldn't ever catch up? Would be interested to see if the metric input rate is spiky.

ssoroka commented 4 years ago

@russorat

Is the behavior to store metrics in a memory queue and "flush" those metrics to disk once the limit has been hit?

Sort of. Right now if the queue is full we drop the message. It'd be easily enough to redirect that message to disk.

Then continue filling the in memory queue again?

this would always be the default.

When the connection is restored, the process is reversed until all files are processed?

I explain this below.

File(s) would be processed and removed once it is confirmed a successful response from the output.

yes.

We can easily write to disk when an output buffer is full (maybe even via outputs.file). Inputs.tail supports backpressure, so it could be used to read the files back in without dropping metrics. Essentially these things would always run and only metrics that don't fit in memory would be routed to this disk-buffered loop.

some challenges:

barbaranelson commented 4 years ago

I'm leaning more towards making this an input plugin problem. It's up to the input plugin to either apply backpressure to its source, or store locally. So from the rest of Telegraf, it's applying the back pressure to the input plugin and then the input plugin hides whether it is actually applying a back pressure or just writing to disk and it will later on read back from disk.

Making it a problem on the output side seems more complicated than letting the input side take care of it.

mrdavidaylward commented 4 years ago

I mean running two telegrafs one writing to file only and the other reading and sending would work. lol, one of those woke up at 2 am and though this was a good idea moment. Although it is kinda a smart stupid idea. ;) besides all the flaws..

ssoroka commented 4 years ago

Couple things to keep in mind:

So really we're just trading one set of problems for another.

bfg111 commented 4 years ago

Ideally, this would be a per/output memory/disk buffer so that when issues occur writing to one output the system remains operational for all other outputs and the buffer writes to disk until that particular output becomes operational again. I would think there would be some disk high watermark that, once reached, would drop messages that are being written to that output instead of writing them to a disk buffer. I would also think a global max memory threshold and a max disk threshold would make sense for the combined per output buffers. Once the memory buffer is full then it would spill over to the disk buffer.

rightkick commented 4 years ago

I think this is a very useful feature to avoid data loss for critical data and still keep a simple and robust data pipeline. Is there any plan to include this?

I have the following scenario: a low spec hardware appliance at the edge collecting metrics in influxdb that needs to push the data to a central server. The network connection is intermittent and the hardware appliance may be restarted. It will be good to have an option at telegraf to retain not sent data after loss of communication or restart of device. The data are critical and must be retained. Also, due to low device specs (60GB disk, 4GB RAM, 2 cpu cores) is cannot run apache kafka. One will need to investigate other options (rabbitmq or other) so as to have this option and it will be nice to avoid adding more components into the mix.

rightkick commented 3 years ago

Ended up adding rabbitMQ into the mix to ensure data persistence at Telegraf buffer.

naorw commented 3 years ago

I agree it's important specially if using external store DB such as influxcloud, local DC can have connectivity issues, why loose metrics or manage external components.

v-j-f commented 3 years ago

Hi all,

Is there a maximum limit for the metric_buffer_limit value? In what proportion the increase of this value affects the memory consumption?

We frequently see in the telegraf log the following messages:

[outputs.influxdb] Metric buffer overflow; 2045 metrics have been dropped.

Currently we have configured:

[agent]
  metric_buffer_limit = 10000

[[outputs.influxdb]]
  metric_buffer_limit = 750000

but the metrics are still being deleted.

Environment:

Thanks.

Sorry if this is not the right thread for this.

ssoroka commented 3 years ago

Is there a maximum limit for the metric_buffer_limit value? In what proportion the increase of this value affects the memory consumption?

There is no specific maximum, but you will eventually run out of memory before the max can be hit. for smaller metrics I assume about 1k of memory multiplied by the max number of metrics (the metric_buffer_limit) for memory use. For larger metrics you may need to use a value larger than 1k (figuring out this number exactly isn't trivial). Leave room for error so you don't see "out of memory" crashes.

Note that If you always see metric drops no matter the metric_buffer_limit this might be because you have more data throughput than the output can keep up with, outputs.influxdb can try enabling gzip compression, but in general if this is true, the resolution of this issue is not going to solve your problem; you're just going to fill your disk as well and see "disk out of space" error messages.

v-j-f commented 3 years ago

Thank you @ssoroka for the reply.

leventov commented 3 years ago

For our project, we would like to have metrics persisted in an SQLite buffer which is also available for local querying as a "micro" timeseries database. Since it's probably not in sync with the vision of project maintainers, we are going to fork Telegraf to add support for this. But I would be happy to learn otherwise.

ssoroka commented 3 years ago

@leventov rather than a fork, take a look at processors.execd, outputs.execd, and execd shim for custom plugins.

WireFr33 commented 2 years ago

With reference to some of the concerns above: backpressure - Does TCP not address this already? Do we need to duplicate TCP flow control? slow consumers - Same as above. TCP flow control. out of order - Can this be addressed with timestamps in the data?

Can a persistent output buffer with a user configurable fixed size be implemented together with the following?

jhychan commented 2 years ago

Chiming in here as this is a feature that would be great to have. However, to me it sounds a lot like a problem that could be solved by write-ahead logging, which is what a few comments here have already described. It's also what prometheus/loki agents (grafana-agent/promtail) have implemented.

Would it be feasible to add a WAL implementation to Telegraf as part of the outputs? It would be an optional common feature for all output plugins - but users would only enable it for output plugins where they need the benefits of a WAL. It can serve to complement the memory buffer, or perhaps entirely replace the in-memory metric buffer. Could we not treat the WAL simply as an alternative to the memory buffer, with some performance trade-offs? Memory and disk effectively serve the same function here - storing metrics in case the a given output plugin fails to process them. The only difference is that the disk buffer is slower but can be persisted and re-processed when Telegraf starts back up.

Concerns about disk utilisation can be managed much like the in-memory buffer - limit by disk usage, or by number of metrics it can hold. And like the current metric buffer, if the output plugin fails to process the metrics it simply remains in the WAL until dropped by overflow (or manually cleared if implementation allows).

andoks commented 2 years ago

One idea might be to create a simple Write-ahead-log CLI tool (e.g by using https://pkg.go.dev/github.com/tidwall/wal or a similar library), and integrate that tool using a combination of outputs.exec(d) and inputs.exec(d) plugins to persist the values, making the CLI tool handle the pruning and truncating of the WAL.

This would however require that the inputs.exec(d) plugin and subsequent plugins apply backpressure and/or acknowledge data written.

n3k232 commented 1 year ago

Totally in need of a solution for this.

TsubasaBE commented 1 year ago

+1 for this feature. This is still the only missing feature keeping is from moving from Elastic Beats to Telegraf entirely.

yash1234singh commented 1 year ago

+1 for this feature. Lack of this is forcing me to fluent-bit

pratikdas44 commented 1 year ago

+1 having a similar request. Due to context Deadline issue, sometime metrics don't go until we restart the container. And since Kafka Plugin supports reading only from oldest or newest offset, recovery is taking time. I had asked the same question to chatgpt - image image

Wanted to know, if those feature were there earlier and removed now, because I tested using telegraf as expected it was showing an error. Any does anyone has come up with the same problem, and how they have solved that?

WireFr33 commented 1 year ago

Looks like ChatGPT is creating features that do not exist. image metric_buffer_full_trigger, buffer_path and "dead letter queue" not seem to exist in telegraf. An AI bot that writes this code would be great.

vlcinsky commented 1 year ago

I have the same need and our current solution is using local instance of InfluxDB OSS with bucket replication configured to forward the data to the final InfluxDB instance.

Our chain looks like:

On localhost:

  1. process reading out the data and sending records (in line protocol format) to mosquitto (MQTT) on localhost
  2. mosquitto has configured a bridge to forward records to central system (to support "just now" picture)
  3. telegraf agent using mqtt_client input plugin writing data to localhost influxdb
  4. InfluxDB:
    • storing data in given bucket, possibly having retention set to short period such as 2 days
    • having configured replication on given bucket, forwarding all incoming data to the central instance

On central instance:

I came to this issue to find out, if current telegraf would be an option to replace localhost InfluxDB instance and make the chain a bit simpler.

pratikdas44 commented 1 year ago

@vlcinsky so Telegraf is also sending metrics from localhost to mqtt installed in the central instance?

vlcinsky commented 1 year ago

@pratikdas44 No, there is no need to use telegraf for replicating mqtt messages.

We are using directly mosquitto bridge, which allows for replication of messages between two mosquitto instances.

relevant part of mosquitto.conf could look like:

connection bridge
address mqttcentralserver.acme.com:1883
remote_username john
remote_password secretpassword
topic probe/+ out 0

It belongs to the localhost/edge instance, it listens on local topic probe/+ (e.g. probe/sn1234 or probe/sn5678) and tries to send the messages to mqttcentralserver.acme.com:1883. In this case it does not change topic on the mqttcentralserver.

I allows for persistent queue in case connectivity is lost but as we use mqtt just to deliver current messages to the central server, we do not use this queuing. For safe delivery of all messages we use the replication of InfluxDB bucket from locat/edge instance to the central InfluxDB one.

For the InfluxDB replication we use InfluxDB OSS v2.7 and configure it as described here: https://docs.influxdata.com/influxdb/v2.7/write-data/replication/replicate-data/

willhope commented 1 year ago

+1 for this feature!Vmagent has this capability (Works smoothly in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics are buffered at -remoteWrite.tmpDataPath. The buffered metrics are sent to remote storage as soon as the connection to the remote storage is repaired. The maximum disk usage for the buffer can be limited with -remoteWrite.maxDiskUsagePerURL.)

Hipska commented 7 months ago

Hey all, please have a look at the spec doc for implementing this feature and give your feedback if needed: #14928

DStrand1 commented 2 months ago

We've implemented this feature in #15564 and landed it in the latest nightly builds, we would appreciate any feedback on it!