influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.51k stars 5.55k forks source link

["outputs.kafka"] did not complete within its flush interval #7624

Closed burnyd closed 4 years ago

burnyd commented 4 years ago

Relevant telegraf.conf:

# # Configuration for the Kafka server to send metrics to
[[inputs.cisco_telemetry_gnmi]]
  addresses = ["10.20.30.24:6030","10.20.30.21:6030"]
  username = "ansible"
  password = "ansible"
  encoding = "proto"
  redial = "10s"
  tagexclude = ["openconfig-network-instance:/network-instances/network-instance/protocols/protocol/name"]

  [[inputs.cisco_telemetry_gnmi.subscription]]
    name = "net"
    origin = "openconfig-interfaces"
    path = "/interfaces/interface/state/counters"
    subscription_mode = "sample"
    sample_interval = "10s"

  [[inputs.cisco_telemetry_gnmi.subscription]]
    name = "bgp"
    origin = "openconfig-network-instance"
    path = "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state"
    subscription_mode = "sample"
    sample_interval = "10s"
    heartbeat_interval = "60s"

[[processors.enum]]
  [[processors.enum.mapping]]
    ## Name of the field to map
    field = "session_state"

    [processors.enum.mapping.value_mappings]
      ESTABLISHED = 1
      ACTIVE = 2
      CONNECT = 3
      IDLE = 4
 [[outputs.kafka]]
   brokers = ["kafka:9092"]
   topic = "network"

   client_id = "Telegraf"
   compression_codec = 0
   max_retry = 3

System info:

Docker ce - 19.03.9 telegraf:latest which is 1.14.3

Steps to reproduce:

I am able to run both a zookeeper/kafka broker and send a message out via a producer on a topic and able to see it on a consumer level its a very basic demo test setup. However, when running telegraf to try to output I keep receiving the following log message.

[agent] ["outputs.kafka"] did not complete within its flush interval

I have a very basic telegraf.conf file. Is there something I am missing configuration wise?

Expected behavior:

See the output within the kafka topic.

danielnelson commented 4 years ago

Is the data being written? Perhaps the agent metric_batch_size is to large for the plugin to write within a flush interval, what do you have it set to?

jshcmpbll commented 4 years ago

@burnyd Were you able to resolve this issue? I am seeing the same error for a different plugin and im curious if you were able fix it.

burnyd commented 4 years ago

@jshcmpbll I never ended up fixing it unfortunately. If you are able to fix it do update please.

littlespace commented 4 years ago

@danielnelson You are right, if the metric_batch_size is too high, in my case metric_batch_size=1000 was even high for BGP metrics. I had this issue before and I changed it to metric_batch_size= 100 and flush_interval = "10s" and not getting the warring any more.

@burnyd @jshcmpbll , bgp sensors by default have more tags and fields and in case if you monitoring more than 1 device at the time and each device has more than 50 peers then make sure you check the metric_batch_size and interval in your telegraf config file. by the way I do interval = "1s" on my config file and I am trying to not monitor more than 1 device (I do have at least 300-400 peers on each device) per telegraf agent.

jshcmpbll commented 4 years ago
[agent]
  interval = "60s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  debug = false
  quiet = false
  logfile = ""
  hostname = ""
  omit_hostname = false

[[inputs.statsd]]
  protocol = "udp"
  service_address = ":8125"
  delete_gauges = true
  delete_counters = true
  delete_sets = true
  delete_timings = true
  percentiles = [90]
  metric_separator = "_"
  parse_data_dog_tags = true
  allowed_pending_messages = 20000
  percentile_limit = 1000

[[inputs.cpu]]
  percpu = true
  totalcpu = true
  collect_cpu_time = false

[[inputs.disk]]
  ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]
  mount_points = ["/"]

[[inputs.diskio]]

[[inputs.kernel]]

[[inputs.linux_sysctl_fs]]

[[inputs.mem]]

[[inputs.net]]
  interfaces = ["eth0"]

[[inputs.netstat]]

[[inputs.processes]]

[[inputs.procstat]]
  pattern = "(consul|vault)"

[[inputs.swap]]

[[inputs.system]]

[[inputs.consul]]
  address = "localhost:8501"
  scheme = "https"
  insecure_skip_verify = true

[[inputs.http_response]]
  name_suffix           = "_consul"
  interval              = "60s"
  address               = "https://127.0.0.1:8501/v1/agent/self"
  method                = "GET"
  response_string_match = ""leader":"false""
  insecure_skip_verify  = true

[[inputs.x509_cert]]
  sources = ["https://localhost:8501/"]
  insecure_skip_verify = true
  name_suffix = "_consul"

[[outputs.azure_monitor]]
  resource_id = "**************" ## Commented out intentionally

[[inputs.http_response]]
  name_suffix           = "_vault"
  interval              = "60s"
  address               = "https://127.0.0.1:8443/v1/sys/health?uninitcode=200&performancestandbycode=200&drsecondarycode=200"
  method                = "GET"
  response_string_match = ""standby":false"
  insecure_skip_verify  = true

[[inputs.x509_cert]]
  sources = ["https://localhost:8443/"]
  insecure_skip_verify = true
  name_suffix = "_vault"

This is the config im dealing with, not the same output, azure_monitor instead of kafka but I was curious if anyone had thoughts on it.

@littlespace

What does peers mean in your message? Im a bit new to telegraf and tried searching in the docs and didnt see it mentioned.

I've tried reducing the batch_size to 100 and raising to 2000. I haven't made many changes to metric_buffer_limit so I might give that a go. I've also played with the interval and flush interval quite a bit. Tested 1s, 10s, 100s, etc.

littlespace commented 4 years ago

I was talking about the BGP peers.