influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.62k stars 5.58k forks source link

AMQP consumer should not consume more messages than fit in the input buffer #10244

Closed don-code closed 1 year ago

don-code commented 2 years ago

Feature Request

Proposal:

This proposal suggests that Telegraf does not consume an AMQP queue if its local metric buffer is full. Telegraf does not appear to negative-acknowledge messages if the input buffer is full, nor does it refuse to consume if the input buffer is full. Instead, messages are always consumed and dropped on the floor once the local metric buffer is filled. Furthermore, it consumes as many messages as it can once the agent is brought back up.

Current behavior:

If Telegraf agents using the amqp_consumer input are brought back online after an outage, during which messages have been queued on a RabbitMQ queue, they will consume the queue in one large step:

Screen Shot 2021-12-08 at 09 20 42

The agents will then drop the messages on the floor, losing them permanently:

2021-12-08T14:08:44Z I! Starting Telegraf 1.20.0
2021-12-08T14:08:44Z I! Using config file: /etc/telegraf/telegraf.conf
2021-12-08T14:08:44Z I! Loaded inputs: amqp_consumer
2021-12-08T14:08:44Z I! Loaded aggregators:
2021-12-08T14:08:44Z I! Loaded processors:
2021-12-08T14:08:44Z I! Loaded outputs: influxdb
2021-12-08T14:08:44Z I! Tags enabled:
2021-12-08T14:08:44Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"", Flush Interval:10s
2021-12-08T14:08:46Z W! [outputs.influxdb] Metric buffer overflow; 6926 metrics have been dropped
2021-12-08T14:08:47Z W! [outputs.influxdb] Metric buffer overflow; 20385 metrics have been dropped
2021-12-08T14:08:47Z W! [outputs.influxdb] Metric buffer overflow; 18889 metrics have been dropped
(this message repeats many times)

Desired behavior:

The Telegraf agent should not consume messages off of an AMQP queue if its only recourse is to drop them.

Use case:

One of the benefits of using the AMQP producer/consumer combination is that metrics can be buffered during a backend outage. For instance, metrics can be published to the queue and not consumed while an InfluxDB database is unavailable, then processed in a delayed fashion once InfluxDB becomes available again.

reimda commented 2 years ago

Hi @don-code it makes sense to me that Telegraf shouldn't consume messages and then just drop them.

The first thing that would help someone implementing this change would be to have a way to reproduce the conditions you described, with data in the amqp queue and telegraf that is in metric buffer overflow. Do you have an easy way to set this up so a dev could test a potential change? Thanks!

don-code commented 2 years ago

Hi @reimda - I put together a small test case with Docker Compose. This will launch a RabbitMQ and a Telegraf producer/consumer. The consumer is intentionally configured with a tiny metric buffer.

docker-compose.yml:

version: "3"
services:
  consumer:
    depends_on:
      - rabbitmq
    image: telegraf:1.22.3
    networks:
      - telegraf
    restart: on-failure
    volumes:
      - ./telegraf-consumer.conf:/etc/telegraf/telegraf.conf

  rabbitmq:
    image: rabbitmq:3.8.9-management
    networks:
      - telegraf
    ports:
      - "15672:15672"

  producer:
    depends_on:
      - rabbitmq
    image: telegraf:1.22.3
    networks:
      - telegraf
    volumes:
      - ./telegraf-producer.conf:/etc/telegraf/telegraf.conf
    ports:
      - "8126:8125/udp"

networks:
  telegraf:

telegraf-producer.conf:

[global_tags]
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  hostname = ""
  omit_hostname = false
[[outputs.amqp]]
   brokers = ["amqp://rabbitmq:5672/"]
   exchange = "telegraf"
    exchange_type = "fanout"
    exchange_durability = "transient"
    username = "guest"
    password = "guest"
    use_batch_format = true
[[inputs.cpu]]
  percpu = true
  totalcpu = true
  collect_cpu_time = false
  report_active = false
[[inputs.disk]]
  ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]
[[inputs.diskio]]
[[inputs.kernel]]
[[inputs.mem]]
[[inputs.processes]]
[[inputs.swap]]
[[inputs.system]]

telegraf-consumer.conf:

[global_tags]
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 10
  metric_buffer_limit = 10
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  hostname = ""
  omit_hostname = true
[[outputs.exec]]
  command = ["tee", "-a", "/dev/null"]
[[inputs.amqp_consumer]]
   brokers = ["amqp://rabbitmq:5672/"]
    username = "guest"
    password = "guest"
   exchange = "telegraf"
    exchange_type = "fanout"
    exchange_durability = "transient"
   queue = "telegraf-consumer-1"
   queue_durability = "transient"
   binding_key = "#"
    prefetch_count = 1000

After running docker-compose up and waiting a few minutes, this will be showing in the logs:

consumer_1  | 2022-05-10T16:30:35Z W! [outputs.exec] Metric buffer overflow; 4 metrics have been dropped
consumer_1  | 2022-05-10T16:30:45Z W! [outputs.exec] Metric buffer overflow; 21 metrics have been dropped
consumer_1  | 2022-05-10T16:30:55Z W! [outputs.exec] Metric buffer overflow; 21 metrics have been dropped
(...)
powersj commented 1 year ago

@don-code,

Looking through old issues and came across this.

The amqp_consumer plugin is a service plugin that will collect metrics as they are available. However, there is a limit. The max_undelivered_messages will prevent too many metrics from getting read all in at once. I can simulate this by setting the value to 2, and then sending in 4 messages to rabbit. I then see two sets of messages, the first 2, then the second 2:

2023-08-16T18:24:24Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
metric value=41 1692210269580144780
metric value=42 1692210269593845509
2023-08-16T18:24:34Z D! [outputs.file] Wrote batch of 2 metrics in 31.241µs
2023-08-16T18:24:34Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
metric value=43 1692210274365511557
metric value=44 1692210274365520757
2023-08-16T18:24:44Z D! [outputs.file] Wrote batch of 2 metrics in 29.56µs

In your case, your log messages are coming from the output, not the input. What you have currently is an input that will read the default amount of messages, 1000, and send those to the output, which is currently saying keep 10. As a result, we will drop 990 messages, the amqp_consumer will think it is time to read more messages, and as you see the queue from rabbit goes to 0 because of it.

If you are going to modify the output buffer queue then you need to also modify the input setting for max_undelivered_messages. There is no perfect solution here as the two settings do not and cannot know about each other. Additionally, any other plugin that is around can cause additional pressure on the output buffer.

Let me know if I misunderstood your issue, however, I am going to close this as this is working as expected.

edit: I should add that my preferred solution for something like this is to have a dedicated output only for service input plugins. That way I can be sure that no other inputs are causing issues and I can match up the max undelivered messages of the input with the buffer of the output.