influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.63k stars 5.58k forks source link

["outputs.kafka"] did not complete within its flush interval after kafka broker reboot (after upgrade from telegraf:1.20.2-alpine to telegraf:1.22.1-alpine) and does not recover without telegraf restart #11427

Closed rakopoul closed 2 years ago

rakopoul commented 2 years ago

Relevant telegraf.conf

# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply prepend
# them with $. For strings the variable must be within quotes (ie, "$STR_VAR"),
# for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR)

# Global tags can be specified here in key="value" format.
[global_tags]
  "ecs.swarm" = "$ECS_SWARM_NAME"
  # rack = "1a"
  ## Environment variables can be used as tags, and throughout the config file
  # user = "$USER"

# Configuration for telegraf agent
[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = true

  ## Telegraf will send metrics to outputs in batches of at most
  ## metric_batch_size metrics.
  ## This controls the size of writes that Telegraf sends to output plugins.
  metric_batch_size = 1000

  ## For failed writes, telegraf will cache metric_buffer_limit metrics for each
  ## output, and will flush this buffer on a successful write. Oldest metrics
  ## are dropped first when this buffer fills.
  ## This buffer only fills when writes fail to output plugin(s).
  metric_buffer_limit = 10000

  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"

  ## Default flushing interval for all outputs. You shouldn't set this below
  ## interval. Maximum flush_interval will be flush_interval + flush_jitter
  flush_interval = "10s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"

  ## By default, precision will be set to the same timestamp order as the
  ## collection interval, with the maximum being 1s.
  ## Precision will NOT be used for service inputs, such as logparser and statsd.
  ## Valid values are "ns", "us" (or "µs"), "ms", "s".
  precision = ""

  ## Logging configuration:
  ## Run telegraf with debug log messages.
  debug = false
  ## Run telegraf in quiet mode (error log messages only).
  quiet = false
  ## Specify the log file name. The empty string means to log to stderr.
  logfile = ""

  ## Override default hostname, if empty use os.Hostname()
  hostname = "$DOCKER_HOST_HOSTNAME"
  ## If set to true, do no set the "host" tag in the telegraf agent.
  omit_hostname = false

###############################################################################
#                            PROCESSOR PLUGINS                                #
###############################################################################

[[processors.regex]]
  namepass = [
    "docker_container_cpu",
    "docker_container_mem",
    "docker_container_net",
    "docker_container_blkio"
  ]
  # Removes the unique task ID part from the docker swarm task name, to avoid
  # high series cardinality. The swarm task name format is
  # swarm_service_name.auto-increment.task_id, for example:
  # "ecs-proxy.2.jtcp6neq4kaga21zak09juigm". The following processor will only
  # keep the "ecs-proxy.2" part.
  [[processors.regex.tags]]
    key = "com.docker.swarm.task.name"
    pattern = "^([^\\.]+\\.[0-9]+)\\..+$"
    replacement = "${1}"

###############################################################################
#                            OUTPUT PLUGINS                                   #
###############################################################################

# Configuration for the Kafka server to send metrics to
[[outputs.kafka]]
  ## URLs of kafka brokers
  brokers = KAFKA_BROKERS_LIST
  ## Kafka topic for producer messages
  topic = "$KAFKA_METRICS_TOPIC"
  ## Telegraf tag to use as a routing key
  ##  ie, if this tag exists, it's value will be used as the routing key
  routing_tag = "host"

  ## CompressionCodec represents the various compression codecs recognized by
  ## Kafka in messages.
  ##  0 : No compression
  ##  1 : Gzip compression
  ##  2 : Snappy compression
  compression_codec = 0

  ##  RequiredAcks is used in Produce Requests to tell the broker how many
  ##  replica acknowledgements it must see before responding
  ##   0 : the producer never waits for an acknowledgement from the broker.
  ##       This option provides the lowest latency but the weakest durability
  ##       guarantees (some data will be lost when a server fails).
  ##   1 : the producer gets an acknowledgement after the leader replica has
  ##       received the data. This option provides better durability as the
  ##       client waits until the server acknowledges the request as successful
  ##       (only messages that were written to the now-dead leader but not yet
  ##       replicated will be lost).
  ##   -1: the producer gets an acknowledgement after all in-sync replicas have
  ##       received the data. This option provides the best durability, we
  ##       guarantee that no messages will be lost as long as at least one in
  ##       sync replica remains.
  required_acks = -1

  ## Optional SSL Config
  # ssl_ca = "/etc/telegraf/ca.pem"
  # ssl_cert = "/etc/telegraf/cert.pem"
  # ssl_key = "/etc/telegraf/key.pem"
  ## Use SSL but skip chain & host verification
  # insecure_skip_verify = false

  ## Data format to output.
  ## Each data format has it's own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
  data_format = "influx"

###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################
# Read file input for fds
[[inputs.exec]]
  # Read total number of files used
  commands = ["/bin/bash /home/monitor_file_descriptors.sh"]

  # Read every 60s
  interval = "60s"
  timeout = "5s"

  name_override = "FileDescriptors"

  data_format = "influx"

  taginclude = [
    "ecs.metric.category",
    "ecs.swarm",
    "host",
  ]

  [inputs.exec.tags]
    "ecs.metric.category" = "host"

# Read metrics about cpu usage
[[inputs.cpu]]
  ## Whether to report per-cpu stats or not
  percpu = false
  ## Whether to report total system cpu stats or not
  totalcpu = true
  ## If true, collect raw CPU time metrics.
  collect_cpu_time = false

  fieldpass = [
    "usage_guest_nice",
    "usage_guest",
    "usage_idle",
    "usage_irq",
    "usage_nice",
    "usage_softirq",
    "usage_steal",
    "usage_system",
    "usage_user",
  ]

  [inputs.cpu.tags]
    "ecs.metric.category" = "host"

# Read metrics about disk usage by mount point
[[inputs.disk]]
  ## By default, telegraf gather stats for all mountpoints.
  ## Setting mountpoints will restrict the stats to the specified mountpoints.
  mount_points = [ "/opt/applications" ]

  ## Ignore some mountpoints by filesystem type. For example (dev)tmpfs (usually
  ## present on /run, /var/run, /dev/shm or /dev).
  ignore_fs = ["tmpfs", "devtmpfs", "overlay"]

  taginclude = [
    "ecs.metric.category",
    "ecs.swarm",
    "host",
    "path",
  ]
  fieldpass = [
    "inodes_free",
    "inodes_used",
    "used_percent",
    "used",
  ]
  [inputs.disk.tags]
    "ecs.metric.category" = "host"

# Read metrics about disk IO by device
[[inputs.diskio]]
  ## By default, telegraf will gather stats for all devices including
  ## disk partitions.
  ## Setting devices will restrict the stats to the specified devices.
  # devices = ["sda", "sdb"]
  ## Uncomment the following line if you need disk serial numbers.
  # skip_serial_number = false

  taginclude = [
    "ecs.metric.category",
    "ecs.swarm",
    "host",
    "name",
  ]
  fieldpass = [
    "read_bytes",
    "read_time",
    "reads",
    "write_bytes ",
    "write_time",
    "writes",
  ]
  [inputs.diskio.tags]
    "ecs.metric.category" = "host"

# Read internal telegraf metrics
[[inputs.internal]]
  namepass = [
    "internal_agent",
    "internal_write",
  ]
  taginclude = [
    "ecs.metric.category",
    "ecs.swarm",
    "host",
    "output",
  ]
  fieldpass = [
    "write_time_ns",  # internal_write
    "gather_errors",  # internal_agent
  ]
  [inputs.internal.tags]
    "ecs.metric.category" = "host"

# Get kernel statistics from /proc/stat
[[inputs.kernel]]
  # no configuration
  fieldpass = [
    "context_switches",
    "processes_forked",
  ]
  [inputs.kernel.tags]
    "ecs.metric.category" = "host"

# Read metrics about memory usage
[[inputs.mem]]
  # no configuration
  fieldpass = [
    "available",
    "buffered",
    "cached",
    "free",
    "total",
    "used_percent",
    "used",
  ]
  [inputs.mem.tags]
    "ecs.metric.category" = "host"

# Get the number of processes and group them by status
[[inputs.processes]]
  # no configuration
  fieldpass = [
    "blocked",
    "running",
    "sleeping",
    "stopped",
    "total_threads",
    "total",
    "zombies",
  ]
  [inputs.processes.tags]
    "ecs.metric.category" = "host"

# Read metrics about swap memory usage
[[inputs.swap]]
  # no configuration
  [inputs.swap.tags]
    "ecs.metric.category" = "host"

# Read metrics about system load & uptime
[[inputs.system]]
  # no configuration
  fieldpass = [
    "load1",
    "load15",
    "load5",
    "n_cpus",
    "uptime",
    "uptime",
  ]
  [inputs.system.tags]
    "ecs.metric.category" = "host"

# Gather metrics about network interfaces
[[inputs.net]]
  ## By default, telegraf gathers stats from any up interface (excluding loopback)
  ## Setting interfaces will tell it to gather these explicit interfaces,
  ## regardless of status.
  ##
  # interfaces = ["eth0"]
  [inputs.net.tags]
    "ecs.metric.category" = 'host'

# Read metrics about docker containers
[[inputs.docker]]
  ## Docker Endpoint
  ##   To use TCP, set endpoint = "tcp://[ip]:[port]"
  ##   To use environment variables (ie, docker-machine), set endpoint = "ENV"
  endpoint = "unix:///var/run/docker.sock"
  ## Only collect metrics for these containers, collect all if empty
  container_name_include = []
  ## Timeout for docker list, info, and stats commands
  timeout = "5s"

  namepass = [
    "docker_container_cpu",
    "docker_container_mem",
    "docker_container_net",
    "docker_container_blkio",
    "docker_container_health"
  ]

  ## Whether to report for each container per-device blkio (8:0, 8:1...) and
  ## network (eth0, eth1, ...) stats or not
  perdevice = false

  ## Whether to report for each container total blkio and network stats or not
  total = true

  # Label to be converted to tags
  docker_label_include = [
    "type",
    "com.docker.swarm.service.name",
    "com.docker.swarm.task.name",
    "ecs.service.key",
    "ecs.service.name",
    "ecs.swarm"
  ]

  taginclude = [
    "com.docker.swarm.service.name",
    "com.docker.swarm.task.name",
    "container_image",
    "ecs.metric.category",
    "ecs.service.key",
    "ecs.service.name",
    "ecs.swarm",
    "engine_host",
    "host",
    "type",
    "cpu",
    "network",
    "device",
    "container_status"
  ]

  fieldpass = [
    "container_id",  # docker_container_cpu
    "usage_percent",  # docker_container_cpu
    "usage",  # docker_container_mem
    "cache",  # docker_container_mem
    "rx_bytes",  # docker_container_net
    "tx_bytes",  # docker_container_net
    "rx_dropped",  # docker_container_net
    "tx_dropped",  # docker_container_net
    "rx_errors",  # docker_container_net
    "tx_errors",  # docker_container_net
    "io_serviced_recursive_read",  # docker_container_blkio
    "io_serviced_recursive_write",  # docker_container_blkio
    "io_service_bytes_recursive_read",  # docker_container_blkio
    "io_service_bytes_recursive_write",  # docker_container_blkio
    "health_status"  # docker_container_health
  ]

  [inputs.docker.tags]
    "ecs.metric.category" = "docker"

  # Keep only points tagged with cpu=cpu-total
  [inputs.docker.tagdrop]
    "cpu" = [
      "cpu0",
      "cpu1",
      "cpu2",
      "cpu3",
      "cpu4",
      "cpu5",
      "cpu6",
      "cpu7"
    ]

Logs from Telegraf

Logs are producing this message in the affected telegraf containers:

"["outputs.kafka"] did not complete within its flush interval"

System info

telegraf:1.22.1-alpine, docker engine 20.10.16

Docker

We have a cluster with around 100 VMs managed with docker swarm. On each VM we have several telegraf agents to monitor different metrics (microservice or VM specific) reporting a certain Kafka topic. Our Kafka is composed of three VMs for HA. After upgrading from 1.20.2-alpine to 1.22.1-alpine it seems that when we reboot a Kafka VM (not all of them together at any time) e.g. for maintenance reasons some of the telegraf agents stuck in the following log:

"["outputs.kafka"] did not complete within its flush interval"

which from they never recover without restarting the relevant container. For this time we dont have any metric for these VMs. The same happens for other input plugins as well we use like elastic or postgres (but not for all telegraf - seems to me that those that report most data produce the problem easier). I tried to mitigate the issue by increasing the flush_interval, but it seems that i have to use values greater than the time the rebooting Kafka VM takes to come up to fix it which is not acceptable for our metrics. The same never happened with 1.20.2-alpine version of docker image.

I reproduced the same issue with Kafka reboot on our dev environment as well which is much smaller scale.

Steps to reproduce

1.Telegraf reporting data to a topic in Kafka. 2.Restart one of the Kafka VMs. 3. ...

Expected behavior

I would expect telegraf to send metrics to the other two VMs while one is rebooting and is not available, or at least recover after VM has come up.

Actual behavior

Some telegraf containers keep not reporting metrics and producing the following message in logs:

"["outputs.kafka"] did not complete within its flush interval"

Additional info

No response

powersj commented 2 years ago

Hi,

Please enable debug mode in your config and provide the telegraf logs during a failure.

"["outputs.kafka"] did not complete within its flush interval"

This message alone is not necessary a bad thing. It only means that a flush happened and and it took longer than expected. It does not mean that Telegraf never is able to recover or connect. In that case, an error from the output about not being able to connect would appear.

The kafka plugin works using the sarama library and given the complexity of a kafka deployment the additional debug logs from the sarama library are required to understand what else is going on. Also understand that the sarama plugin only uses the nodes you specify initially to collect metadata about the cluster. After that point, the metadata that it receives, which tells it the leader and other node information is used.

Thanks

rakopoul commented 2 years ago

Hello and thanks for your answer,

I understand that log by itself is ok if it happens sometimes. The issue which i dont understand is why some agents stuck in this state (no metrics at all are sent) and require restart of the container to come back to normal and this happens only after upgrade. I would be ok if metrics are lost during restart of Kafka VM (this normally happens only during maintenance phase), although my Kafka list consists of three VMs and i would expect telegraf agent to connect to one of the other ones in the list. But now telegraf does not recover after Kafka VM has came up again and needs restart.

I will try to capture debug logs and provide them.

BR

rakopoul commented 2 years ago

Here are the debug logs. 2022-07-01T10:22:23Z --> Around this time i rebooted intentionally one of the Kafka brokers, the other two brokers in the telegraf list were up and running.

A couple minutes later kafka broker recovered but still telegraf seems to fill up the buffer and never recovers. It recovers only if i reboot the containers. It did not happen to all agents i have in my environment but in around 30-40% of them. Telegraf debug logs.txt

powersj commented 2 years ago

Based on the logs it does not seem like the node goes down for very long. Then after it gets shut down a couple of times sarama gives up trying to connect to that broker.

There also never appears to be an election for a new leader when the node first goes down.

It would be good to run with debugging on the other nodes as well to understand how some are recovering. Right now, there is not enough info in this bug other than further guessing. Can you collect debug logs on other nodes as well and provide one from one that recovers?

rakopoul commented 2 years ago

I attach logs from a node that recovers after Kafka reboots (still some other telegraf containers did not recover): Telegraf debug logs (from node that recovers).txt

We are planning to downgrade telegraf to older version since this is critical for our ability to monitor our production VMs (where we never had such a problem).

powersj commented 2 years ago

Thanks for those logs.

Telegraf v1.20.2 had v1.27.2 of sarama. Telegraf v1.22.1 had v1.32.0 of sarama.

v1.32.0 of sarama was superseded due to https://github.com/Shopify/sarama/issues/2150, due to producer hanging issue.

Given Telegraf has also released v1.23.1, which includes v1.34.1 of sarama, can I ask if you can reproduce using the latest sarama library with debug still enabled?

I really do appreciate the back and forth. There is not a lot of kafka specific code and the sarama library is used for the connection and retries, so it would be really good to try out a new version. Thanks!

rakopoul commented 2 years ago

I tested with telegraf v.1.23.1-alpine and i did not find and producer hanging. So do you think that issue is the same as #2150?

powersj commented 2 years ago

Awesome, thank you for trying that out!

And yes, I do think it is related to the sarma issue. The PR that resolved the issue spelled out the following scenario:

1) start an async producer, which telegraf uses 2) restart a broker multiple times 3) the logs demonstrate a leader update/retries 4) sarama stops sending messages

This would affect any version of Telegraf that uses v1.32.0 of sarama, which I think is v1.22.1-v1.23.0. The resolution is to move to the later v1.34.1 of the sarama library in v1.23.1 of Telegraf, which contains the fix.

As a result, I am going to close this issue, but if you do come across something related to this, please feel free to comment. Again I greatly appreciate you gathering those logs and trying the different versions!

AnishAngadi1807 commented 11 months ago

telegraf-1.27.4-1.x86_64.rpm I have installed the above mentioned telegraf agent. Facing the same buffer fullness issue and telegraf is failing to scrape the metrics even though its in runnin state on my Linux VM. This is the configuration we have used. metric_buffer_limit = 200000 collection_jitter = "0s" flush_interval = "200s" flush_jitter = "0s" precision = "0s" hostname = "" omit_hostname = true interval = "200s" debug = true quiet = false