influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.63k stars 5.58k forks source link

Kafka consumer SASL plain EOF msg #15768

Closed smga3000 closed 2 months ago

smga3000 commented 2 months ago

Relevant telegraf.conf

[[inputs.kafka_consumer]]
  brokers = ["serverless.warpstream.com:9092"]
  topics = ["sensors"]
  version = "1.0.0"
  enable_tls = true
  sasl_username = "foo"
  sasl_password = "bar"
  sasl_mechanism = "PLAIN"
  consumer_group = "telegraf_metrics_consumers"
  offset = "oldest"
  max_message_len = 1000000
  data_format = "json"
  tag_keys = ["sensorId"]
  json_time_key = "timestamp"
  json_time_format = "unix_ms"

Logs from Telegraf

2024-08-22T18:25:03Z D! [sarama] Initializing new client
2024-08-22T18:25:03Z D! [sarama]  client/metadata fetching metadata for all topics from broker serverless.warpstream.com:9092
2024-08-22T18:25:03Z D! [sarama] Completed pre-auth SASL handshake. Available mechanisms: [PLAIN SCRAM-SHA-512]
2024-08-22T18:25:03Z D! [sarama]  Failed to read response while authenticating with SASL to broker serverless.warpstream.com:9092: EOF
2024-08-22T18:25:03Z D! [sarama]  Closed connection to broker serverless.warpstream.com:9092
2024-08-22T18:25:03Z D! [sarama]  client/metadata got error from broker -1 while fetching metadata: EOF
2024-08-22T18:25:03Z D! [sarama] client/metadata no available broker to send metadata request to
2024-08-22T18:25:03Z D! [sarama]  client/brokers resurrecting 1 dead seed brokers
2024-08-22T18:25:04Z D! [sarama]  client/metadata retrying after 250ms... (2 attempts remaining)
2024-08-22T18:25:04Z D! [sarama]  client/metadata fetching metadata for all topics from broker serverless.warpstream.com:9092
2024-08-22T18:25:04Z D! [sarama] Completed pre-auth SASL handshake. Available mechanisms: [PLAIN SCRAM-SHA-512]
2024-08-22T18:25:04Z D! [sarama]  Failed to read response while authenticating with SASL to broker serverless.warpstream.com:9092: EOF
2024-08-22T18:25:04Z D! [sarama]  Closed connection to broker serverless.warpstream.com:9092
2024-08-22T18:25:04Z D! [sarama]  client/metadata got error from broker -1 while fetching metadata: EOF
2024-08-22T18:25:04Z D! [sarama] client/metadata no available broker to send metadata request to
2024-08-22T18:25:04Z D! [sarama]  client/brokers resurrecting 1 dead seed brokers
2024-08-22T18:25:04Z D! [sarama]  client/metadata retrying after 250ms... (1 attempts remaining)
2024-08-22T18:25:04Z D! [sarama]  client/metadata fetching metadata for all topics from broker serverless.warpstream.com:9092
2024-08-22T18:25:05Z D! [sarama] Completed pre-auth SASL handshake. Available mechanisms: [PLAIN SCRAM-SHA-512]
2024-08-22T18:25:05Z D! [sarama]  Failed to read response while authenticating with SASL to broker serverless.warpstream.com:9092: EOF
2024-08-22T18:25:05Z D! [sarama]  Closed connection to broker serverless.warpstream.com:9092
2024-08-22T18:25:05Z D! [sarama]  client/metadata got error from broker -1 while fetching metadata: EOF
2024-08-22T18:25:05Z D! [sarama] client/metadata no available broker to send metadata request to
2024-08-22T18:25:05Z D! [sarama]  client/brokers resurrecting 1 dead seed brokers
2024-08-22T18:25:05Z D! [sarama]  client/metadata retrying after 250ms... (0 attempts remaining)
2024-08-22T18:25:05Z D! [sarama]  client/metadata fetching metadata for all topics from broker serverless.warpstream.com:9092
2024-08-22T18:25:05Z D! [sarama] Completed pre-auth SASL handshake. Available mechanisms: [PLAIN SCRAM-SHA-512]
2024-08-22T18:25:05Z D! [sarama]  Failed to read response while authenticating with SASL to broker serverless.warpstream.com:9092: EOF
2024-08-22T18:25:05Z D! [sarama]  Closed connection to broker serverless.warpstream.com:9092
2024-08-22T18:25:05Z D! [sarama]  client/metadata got error from broker -1 while fetching metadata: EOF
2024-08-22T18:25:05Z D! [sarama] client/metadata no available broker to send metadata request to
2024-08-22T18:25:05Z D! [sarama]  client/brokers resurrecting 1 dead seed brokers
2024-08-22T18:25:05Z D! [sarama] Closing Client
2024-08-22T18:25:05Z E! [telegraf] Error running agent: starting input inputs.kafka_consumer: create consumer: kafka: client has run out of available brokers to talk to: EOF

System info

telegraf 1.31.3

Docker

No response

Steps to reproduce

  1. Attempt to connect to warpstream.com
  2. Note the EOF errors

Expected behavior

connect successfullly

Actual behavior

not connecting with EOF

Additional info

No response

powersj commented 2 months ago

@smga3000,

I believe the service you are talking to requires you to set the kafka consumer's version option to something higher than the default of 0.10.2.0 (the first version to support consumer groups). When I set version = "1.0.0" I was able to connect and send messages:

2024-08-22T18:40:26Z D! [agent] Starting service inputs
2024-08-22T18:40:36Z D! [outputs.influxdb_v2] Wrote batch of 1000 metrics in 543.391813ms
2024-08-22T18:40:36Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
2024-08-22T18:40:36Z D! [outputs.influxdb_v2] Wrote batch of 1000 metrics in 137.249748ms
2024-08-22T18:40:36Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
2024-08-22T18:40:37Z D! [outputs.influxdb_v2] Wrote batch of 115 metrics in 128.903235ms
2024-08-22T18:40:37Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics

To summarize, for your host, we had two changes:

  1. Enable TLS via enable_tls = true
  2. Update the client version to something higher like version = "1.0.0"

edit: I have updated the issue's configuration with what I got to work, minus credentials

smga3000 commented 2 months ago

That is working!

powersj commented 2 months ago

@smga3000,

Two other things I was going to point out, based on your data you could set the timestamp of the metrics by specifying the json_time_key and json_time_format options. I forget what the timestamp looked like, but this is an example:

json_time_key = "timestamp"
json_time_format = "2006-01-02T15:04:05Z"

If you give me an example timestamp I can give you a working example.

The other is setting the device name as a tag, I think it was called device:

  tag_keys = ["device"]

That way influxdb indexes the tag!

smga3000 commented 2 months ago

the data looks like this: {"sensorId":"sensor-0","timestamp":1720474028903,"value":40.16614103770828} {"sensorId":"sensor-3","timestamp":1720474029292,"value":43.73939640442152}

powersj commented 2 months ago

Looks like unix millisecond timestamp:

tag_keys = ["sensorId"]
json_time_key = "timestamp"
json_time_format = "unix_ms"
powersj commented 2 months ago

I'm going to close this, but @smga3000 if anything else comes up, feel free to re-open or open another issue!