influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.6k stars 5.56k forks source link

External plugin does not adhere to metric_batch_size #11902

Closed agneborn98 closed 11 months ago

agneborn98 commented 2 years ago

Relevant telegraf.conf

# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -aalborg/test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply surround
# them with ${}. For strings the variable must be within quotes (ie, "${STR_VAR}"),
# for numbers and booleans they should be plain (ie, ${INT_VAR}, ${BOOL_VAR})

# Global tags can be specified here in key="value" format.
[global_tags]
  # dc = "us-east-1" # will tag all metrics with dc=us-east-1
  # rack = "1a"
  ## Environment variables can be used as tags, and throughout the config file
  # user = "$USER"

# Configuration for telegraf agent
[agent]
  ## Default data collection interval for all inputs
  interval = "1s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = false

  ## Telegraf will send metrics to outputs in batches of at most
  ## metric_batch_size metrics.
  ## This controls the size of writes that Telegraf sends to output plugins.
  metric_batch_size = 100

  ## Maximum number of unwritten metrics per output.  Increasing this value
  ## allows for longer periods of output downtime without dropping metrics at the
  ## cost of higher maximum memory usage.
  metric_buffer_limit = 10000

  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"

  ## Collection offset is used to shift the collection by the given amount.
  ## This can be be used to avoid many plugins querying constraint devices
  ## at the same time by manually scheduling them in time.
  # collection_offset = "0s"

  ## Default flushing interval for all outputs. Maximum flush_interval will be
  ## flush_interval + flush_jitter
  flush_interval = "20s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"

  ## Collected metrics are rounded to the precision specified. Precision is
  ## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
  ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
  ##
  ## By default or when set to "0s", precision will be set to the same
  ## timestamp order as the collection interval, with the maximum being 1s:
  ##   ie, when interval = "10s", precision will be "1s"
  ##       when interval = "250ms", precision will be "1ms"
  ##
  ## Precision will NOT be used for service inputs. It is up to each individual
  ## service input to set the timestamp at the appropriate precision.
  precision = "1us"

  ## Log at debug level.
   debug = true
  ## Log only error level messages.
   quiet = false

  ## Log target controls the destination for logs and can be one of "file",
  ## "stderr" or, on Windows, "eventlog".  When set to "file", the output file
  ## is determined by the "logfile" setting.
   logtarget = "file"

  ## Name of the file to be logged to when using the "file" logtarget.  If set to
  ## the empty string then logs are written to stderr.
   logfile = "./telegraf.log"
  ## The logfile will be rotated after the time interval specified.  When set
  ## to 0 no time based rotation is performed.  Logs are rotated only when
  ## written to, if there is no log activity rotation may be delayed.
  # logfile_rotation_interval = "0h"

  ## The logfile will be rotated when it becomes larger than the specified
  ## size.  When set to 0 no size based rotation is performed.
  # logfile_rotation_max_size = "0MB"

  ## Maximum number of rotated archives to keep, any older logs are deleted.
  ## If set to -1, no archives are removed.
  # logfile_rotation_max_archives = 5

  ## Pick a timezone to use when logging or type 'local' for local time.
  ## Example: America/Chicago
  # log_with_timezone = ""

  ## Override default hostname, if empty use os.Hostname()
  hostname = ""
  ## If set to true, do no set the "host" tag in the telegraf agent.
  omit_hostname = false

  ## Method of translating SNMP objects. Can be "netsnmp" which
  ## translates by calling external programs snmptranslate and snmptable,
  ## or "gosmi" which translates using the built-in gosmi library.
  # snmp_translator = "netsnmp"

# # Run executable as long-running output plugin
 [[outputs.execd]]
#   ## One program to run as daemon.
#   ## NOTE: process and each argument should each be their own string
   command = ["binary/flight.exe", "-config", "./plugins/output/flight/sample.conf"]
#
#   ## Environment variables
#   ## Array of "key=value" pairs to pass as environment variables
#   ## e.g. "KEY=value", "USERNAME=John Doe",
#   ## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
#   # environment = []
#
#   ## Delay before the process is restarted after an unexpected termination
   restart_delay = "3s"
#
#   ## Data format to export.
#   ## Each data format has its own unique set of configuration options, read
#   ## more about them here:
#   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
#   data_format = "influx"

# # Read metrics from MQTT topic(s)
 [[inputs.mqtt_consumer]]
#   ## Broker URLs for the MQTT server or cluster.  To connect to multiple
#   ## clusters or standalone servers, use a separate plugin instance.
#   ##   example: servers = ["tcp://localhost:1883"]
#   ##            servers = ["ssl://localhost:1883"]
#   ##            servers = ["ws://localhost:1883"]
   servers = ["tcp://127.0.0.1:1883"]
#
#   ## Topics that will be subscribed to.
   topics = [
      "city1/test/1",
      "city2/test/1",
      "city3/test/1",
      "city4/test/1",
      "city5/test/1",
      "city6/test/1",
      "city7/test/1",
      "city8/test/1",
      "city9/test/1",
      "city10/test/1"
   ]
#
#   ## The message topic will be stored in a tag specified by this value.  If set
#   ## to the empty string no topic tag will be created.
 topic_tag = ""
#
#   ## QoS policy for messages
#   ##   0 = at most once
#   ##   1 = at least once
#   ##   2 = exactly once
#   ##
#   ## When using a QoS of 1 or 2, you should enable persistent_session to allow
#   ## resuming unacknowledged messages.
#   # qos = 0
#
#   ## Connection timeout for initial connection in seconds
#   # connection_timeout = "30s"
#
#   ## Maximum messages to read from the broker that have not been written by an
#   ## output.  For best throughput set based on the number of metrics within
#   ## each message and the size of the output's metric_batch_size.
#   ##
#   ## For example, if each message from the queue contains 10 metrics and the
#   ## output metric_batch_size is 1000, setting this to 100 will ensure that a
#   ## full batch is collected and the write is triggered immediately without
#   ## waiting until the next flush_interval.
# max_undelivered_messages = 1000
#
#   ## Persistent session disables clearing of the client session on connection.
#   ## In order for this option to work you must also set client_id to identify
#   ## the client.  To receive messages that arrived while the client is offline,
#   ## also set the qos option to 1 or 2 and don't forget to also set the QoS when
#   ## publishing.
#   # persistent_session = false
#
#   ## If unset, a random client ID will be generated.
#   # client_id = ""
#
#   ## Username and password to connect MQTT server.
#   # username = "telegraf"
#   # password = "metricsmetricsmetricsmetrics"
#
#   ## Optional TLS Config
#   # tls_ca = "/etc/telegraf/ca.pem"
#   # tls_cert = "/etc/telegraf/cert.pem"
#   # tls_key = "/etc/telegraf/key.pem"
#   ## Use TLS but skip chain & host verification
#   # insecure_skip_verify = false
#
#   ## Data format to consume.
#   ## Each data format has its own unique set of configuration options, read
#   ## more about them here:
#   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
   data_format = "json"

   ## Enable extracting tag values from MQTT topics
   ## _ denotes an ignored entry in the topic path
   [[inputs.mqtt_consumer.topic_parsing]] 
    topic = "+/test/+"
    tags = "location/_/_"

#   ## Value supported is int, float, unit
#   #   [[inputs.mqtt_consumer.topic.types]]
#   #      key = type

Logs from Telegraf

2022-09-28T13:10:05Z I! Starting Telegraf 1.25.0-fc8a300f
2022-09-28T13:10:05Z I! Available plugins: 206 inputs, 9 aggregators, 26 processors, 20 parsers, 58 outputs
2022-09-28T13:10:05Z I! Loaded inputs: mqtt_consumer
2022-09-28T13:10:05Z I! Loaded aggregators: 
2022-09-28T13:10:05Z I! Loaded processors: 
2022-09-28T13:10:05Z I! Loaded outputs: execd
2022-09-28T13:10:05Z I! Tags enabled: host=DESKTOP-GLPHF1A
2022-09-28T13:10:05Z I! [agent] Config: Interval:1s, Quiet:false, Hostname:"DESKTOP-GLPHF1A", Flush Interval:20s
2022-09-28T13:10:05Z D! [agent] Initializing plugins
2022-09-28T13:10:05Z D! [agent] Connecting outputs
2022-09-28T13:10:05Z D! [agent] Attempting connection to [outputs.execd]
2022-09-28T13:10:05Z I! [outputs.execd] Starting process: binary/flight.exe [-config ./plugins/output/flight/sample.conf]
2022-09-28T13:10:06Z D! [agent] Successfully connected to outputs.execd
2022-09-28T13:10:06Z D! [agent] Starting service inputs
2022-09-28T13:10:06Z I! [inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]
2022-09-28T13:10:26Z D! [outputs.execd] Buffer fullness: 0 / 10000 metrics
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z D! [outputs.execd] Wrote batch of 81 metrics in 12.2744ms
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z D! [outputs.execd] Buffer fullness: 0 / 10000 metrics
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:10:46Z I! [outputs.execd] 1
2022-09-28T13:11:18Z D! [agent] Stopping service inputs
2022-09-28T13:11:18Z D! [inputs.mqtt_consumer] Disconnecting [tcp://127.0.0.1:1883]
2022-09-28T13:11:18Z D! [inputs.mqtt_consumer] Disconnected [tcp://127.0.0.1:1883]
2022-09-28T13:11:18Z D! [agent] Input channel closed
2022-09-28T13:11:18Z I! [agent] Hang on, flushing any cached metrics before shutdown
2022-09-28T13:11:18Z D! [outputs.execd] Buffer fullness: 0 / 10000 metrics
2022-09-28T13:11:18Z I! [agent] Stopping running outputs
2022-09-28T13:11:18Z I! [outputs.execd] Process binary/flight.exe shut down
2022-09-28T13:11:18Z D! [agent] Stopped Successfully

System info

Telegraf 1.25.0-fc8a300f, Windows 11 Pro 10.0.22000

Docker

No response

Steps to reproduce

  1. Run a plugin externally with the execd shim with a batch size larger than 1.
  2. Check the length of the slice metrics in the Write() function.

Expected behavior

I expected the length of the slice to be about the same as the batch size, since that is what the configuration says is the purpose of metric_batch_size:

"This controls the size of writes that Telegraf sends to output plugins."

Actual behavior

As you can see in the log file, I printed out the length of the metrics []telegraf.Metric with len(metrics), but it always stays at a length of 1 no matter which batch size I choose. It still prints out "Wrote batch of 100", but it doesn't actually buffer a hundred metrics. It just loops the Write() function a hundred times.

Additional info

There is something that changes with batch size, because the amount of metrics that are written to the output every second (Throughput) changes when I change the value. I have no idea what actually changes, but it is not the size of that slice. I tried making the plugin internal, and then the slice is adjusted to be at or near the batch size, so I believe it has something to do with the external part.

reimda commented 2 years ago

The code that causes this behavior is in plugins/common/shim/output.go. https://github.com/influxdata/telegraf/blob/ed8bd1dd525f266f204c921986de40bdf92c4e94/plugins/common/shim/output.go#L47

In this loop, the shim reads a line, parses it into a metric, then passes the single metric to the output's Write() method.

I don't think when the execd plugin streams metrics out that it marks the beginning or end of the batch. That means the shim on the other side doesn't know how many to gather together in a batch. Instead it calls write once per metric.

If this doesn't work for you could you share any alternate ideas and maybe contribute a PR to implement them?

agneborn98 commented 2 years ago

Thank you for the response!

I don't really have the time or the expertise in Go to come up with a solution for this at the moment.

I think we can fairly say that this is not a bug, but rather a design decision/flaw. So if you agree, we can label this as a feature request instead?

reimda commented 2 years ago

I think you're right that execd outputs should get normal sized batches, not batches of a single metric. Let's leave this issue labeled as a bug.

We will need to work on the design for this fix before writing code. The execd output plugin will need to signal the end of a batch, and the shim will need to watch for that signal. The problem is that the stream between them is influxdb line protocol which doesn't have a native way to signal out of band information like this. We can't change to another protocol without breaking existing external outputs built with the current plugin and shim code. We will likely need to hide the signal in a line protocol comment or do something else that works with current and new code.

I'm not sure when this will be implemented. It's not currently scheduled for the project dev team. I will label it "help wanted" so people will know it's available for community members to work on.

powersj commented 11 months ago

13673 added the ability to use use_batch_format, which will serialize a batch and send that entire batch to stdin of the process.

Closing as fixed.