influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.53k stars 5.56k forks source link

Telegraf not writing to Kafka if at least one message of the batch is intended for non-existing topic #10864

Open ghost opened 2 years ago

ghost commented 2 years ago

Relevant telegraf.conf

[[outputs.kafka]]
  brokers = ["localhost:9092"]
  topic_tag = "X-Topic-Name"
  exclude_topic_tag = true

Logs from Telegraf

2022-03-21T16:13:03Z D! [outputs.kafka] Wrote batch of 1 metrics in 17.505578ms 2022-03-21T16:13:03Z D! [outputs.kafka] Buffer fullness: 0 / 1000000 metrics 2022-03-21T16:13:33Z D! [outputs.kafka] Buffer fullness: 0 / 1000000 metrics 2022-03-21T16:14:03Z D! [outputs.kafka] Buffer fullness: 0 / 1000000 metrics 2022-03-21T16:14:33Z D! [outputs.kafka] Wrote batch of 1 metrics in 1.716951ms 2022-03-21T16:14:33Z D! [outputs.kafka] Buffer fullness: 0 / 1000000 metrics 2022-03-21T16:15:03Z D! [outputs.kafka] Buffer fullness: 0 / 1000000 metrics 2022-03-21T16:10:36Z D! [outputs.kafka] Buffer fullness: 2 / 1000000 metrics 2022-03-21T16:10:36Z E! [agent] Error writing to outputs.kafka: kafka: Failed to produce message to topic test_topic_not_existing: kafka server: Request was for a topic or partition that does not exist on this broker. 2022-03-21T16:11:05Z D! [sarama] client/metadata fetching metadata for [test_topic_not_existing] from broker kafka:9093 2022-03-21T16:11:05Z D! [sarama] client/metadata found some partitions to be leaderless 2022-03-21T16:11:05Z D! [sarama] client/metadata retrying after 250ms... (3 attempts remaining) 2022-03-21T16:11:06Z D! [sarama] client/metadata fetching metadata for [test_topic_not_existing] from broker kafka:9093 2022-03-21T16:11:06Z D! [sarama] client/metadata found some partitions to be leaderless 2022-03-21T16:11:06Z D! [sarama] client/metadata retrying after 250ms... (2 attempts remaining) 2022-03-21T16:11:06Z D! [sarama] client/metadata fetching metadata for [test_topic_not_existing] from broker kafka:9093 2022-03-21T16:11:06Z D! [sarama] client/metadata found some partitions to be leaderless 2022-03-21T16:11:06Z D! [sarama] client/metadata retrying after 250ms... (1 attempts remaining) 2022-03-21T16:11:06Z D! [sarama] client/metadata fetching metadata for [test_topic_not_existing] from broker kafka:9093 2022-03-21T16:11:06Z D! [sarama] client/metadata found some partitions to be leaderless 2022-03-21T16:11:06Z D! [outputs.kafka] Buffer fullness: 2 / 1000000 metrics 2022-03-21T16:11:06Z E! [agent] Error writing to outputs.kafka: kafka: Failed to produce message to topic test_topic_not_existing: kafka server: Request was for a topic or partition that does not exist on this broker. 2022-03-21T16:11:35Z D! [sarama] client/metadata fetching metadata for [test_topic_not_existing] from broker kafka:9093 2022-03-21T16:11:35Z D! [sarama] client/metadata found some partitions to be leaderless 2022-03-21T16:11:35Z D! [sarama] client/metadata retrying after 250ms... (3 attempts remaining) 2022-03-21T16:11:36Z D! [sarama] client/metadata fetching metadata for [test_topic_not_existing] from broker kafka:9093 2022-03-21T16:11:36Z D! [sarama] client/metadata found some partitions to be leaderless 2022-03-21T16:11:36Z D! [sarama] client/metadata retrying after 250ms... (2 attempts remaining) 2022-03-21T16:11:36Z D! [sarama] client/metadata fetching metadata for [test_topic_not_existing] from broker kafka:9093 2022-03-21T16:11:36Z D! [sarama] client/metadata found some partitions to be leaderless 2022-03-21T16:11:36Z D! [sarama] client/metadata retrying after 250ms... (1 attempts remaining) 2022-03-21T16:11:36Z D! [sarama] client/metadata fetching metadata for [test_topic_not_existing] from broker kafka:9093 2022-03-21T16:11:36Z D! [sarama] client/metadata found some partitions to be leaderless 2022-03-21T16:11:36Z D! [outputs.kafka] Buffer fullness: 2 / 1000000 metrics

System info

1.20.3

Docker

No response

Steps to reproduce

  1. Send a metric intended for non-existing topic
  2. ...

Expected behavior

Telegraf should discard the "faulty" metrics or be able to flush only "genuine" metrics from the batch/buffer.

Actual behavior

If at least one message from the batch is intended for a non-existing topic, Telegraf and Sarama will complain about it but leave the buffer untouched. After encountering such metric(s), the buffer will never be flushed anymore and the tool needs to be restarted.

Additional info

No response

powersj commented 2 years ago

Hi,

Telegraf should discard the "faulty" metrics or be able to flush only "genuine" metrics from the batch/buffer.

We currently take the messages that need to be written and send them with sarama. If there are any errors we return the first one. This is the specific error, I wonder based on the name of that error that it seems like it will be thrown if a partition was missing as well, and not only thrown only when a topic is missing. Additionally, if a topic is actually missing, discovering what topic is actually missing and then re-sending everything else does not actually resolve the issue. It may buy you some time, but it seems more like a bandaid that will still go and fill up a metric buffer.

Thoughts?

ghost commented 2 years ago

It may buy you some time, but it seems more like a bandaid that will still go and fill up a metric buffer.

Thanks for your answer, I agree with this part, it might be better to drop the messages that can't send and prevent the rest of the buffer to flush. Or maybe giving user the possibility to choose the behavior of the buffer, keeping messages until problem(s) eventually get solved or dropping messages.