influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.6k stars 5.57k forks source link

Timestream output does not print ValidationException error #10707

Closed jonathanwvd closed 2 years ago

jonathanwvd commented 2 years ago

Relevant telegraf.conf

# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply surround
# them with ${}. For strings the variable must be within quotes (ie, "${STR_VAR}"),
# for numbers and booleans they should be plain (ie, ${INT_VAR}, ${BOOL_VAR})

# Global tags can be specified here in key="value" format.
[global_tags]
  # dc = "us-east-1" # will tag all metrics with dc=us-east-1
  # rack = "1a"
  ## Environment variables can be used as tags, and throughout the config file
  # user = "$USER"

# Configuration for telegraf agent
[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = true

  ## Telegraf will send metrics to outputs in batches of at most
  ## metric_batch_size metrics.
  ## This controls the size of writes that Telegraf sends to output plugins.
  metric_batch_size = 1000

  ## Maximum number of unwritten metrics per output.  Increasing this value
  ## allows for longer periods of output downtime without dropping metrics at the
  ## cost of higher maximum memory usage.
  metric_buffer_limit = 10000

  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"

  ## Default flushing interval for all outputs. Maximum flush_interval will be
  ## flush_interval + flush_jitter
  flush_interval = "10s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"

  ## By default or when set to "0s", precision will be set to the same
  ## timestamp order as the collection interval, with the maximum being 1s.
  ##   ie, when interval = "10s", precision will be "1s"
  ##       when interval = "250ms", precision will be "1ms"
  ## Precision will NOT be used for service inputs. It is up to each individual
  ## service input to set the timestamp at the appropriate precision.
  ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
  precision = ""

  ## Log at debug level.
  debug = true
  ## Log only error level messages.
  # quiet = false

  ## Log target controls the destination for logs and can be one of "file",
  ## "stderr" or, on Windows, "eventlog".  When set to "file", the output file
  ## is determined by the "logfile" setting.
  # logtarget = "file"

  ## Name of the file to be logged to when using the "file" logtarget.  If set to
  ## the empty string then logs are written to stderr.
  # logfile = ""

  ## The logfile will be rotated after the time interval specified.  When set
  ## to 0 no time based rotation is performed.  Logs are rotated only when
  ## written to, if there is no log activity rotation may be delayed.
  # logfile_rotation_interval = "0d"

  ## The logfile will be rotated when it becomes larger than the specified
  ## size.  When set to 0 no size based rotation is performed.
  # logfile_rotation_max_size = "0MB"

  ## Maximum number of rotated archives to keep, any older logs are deleted.
  ## If set to -1, no archives are removed.
  # logfile_rotation_max_archives = 5

  ## Pick a timezone to use when logging or type 'local' for local time.
  ## Example: America/Chicago
  # log_with_timezone = ""

  ## Override default hostname, if empty use os.Hostname()
  hostname = ""
  ## If set to true, do no set the "host" tag in the telegraf agent.
  omit_hostname = false

###############################################################################
#                            OUTPUT PLUGINS                                   #
###############################################################################

# Configuration for Amazon Timestream output.
[[outputs.timestream]]

  ## Amazon Region
  region = "us-east-2"

  ## Amazon Credentials
  ## Credentials are loaded in the following order:
  ## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
  ## 2) Assumed credentials via STS if role_arn is specified
  ## 3) explicit credentials from 'access_key' and 'secret_key'
  ## 4) shared profile from 'profile'
  ## 5) environment variables
  ## 6) shared credentials file
  ## 7) EC2 Instance Profile
  # access_key = ""
  # secret_key = ""
  #token = ""
  #role_arn = ""
  #web_identity_token_file = ""
  #role_session_name = ""
  #profile = ""
  #shared_credential_file = ""

  ## Endpoint to make request against, the correct endpoint is automatically
  ## determined and this option should only be set if you wish to override the
  ## default.
  ##   ex: endpoint_url = "http://localhost:8000"
  # endpoint_url = ""

  ## Timestream database where the metrics will be inserted.
  ## The database must exist prior to starting Telegraf.
  database_name = "HeronDB"

  ## Specifies if the plugin should describe the Timestream database upon starting
  ## to validate if it has access necessary permissions, connection, etc., as a safety check.
  ## If the describe operation fails, the plugin will not start
  ## and therefore the Telegraf agent will not start.
  describe_database_on_start = false

  ## The mapping mode specifies how Telegraf records are represented in Timestream.
  ## Valid values are: single-table, multi-table.
  ## For example, consider the following data in line protocol format:
  ## weather,location=us-midwest,season=summer temperature=82,humidity=71 1465839830100400200
  ## airquality,location=us-west no2=5,pm25=16 1465839830100400200
  ## where weather and airquality are the measurement names, location and season are tags,
  ## and temperature, humidity, no2, pm25 are fields.
  ## In multi-table mode:
  ##  - first line will be ingested to table named weather
  ##  - second line will be ingested to table named airquality
  ##  - the tags will be represented as dimensions
  ##  - first table (weather) will have two records:
  ##      one with measurement name equals to temperature,
  ##      another with measurement name equals to humidity
  ##  - second table (airquality) will have two records:
  ##      one with measurement name equals to no2,
  ##      another with measurement name equals to pm25
  ##  - the Timestream tables from the example will look like this:
  ##      TABLE "weather":
  ##        time | location | season | measure_name | measure_value::bigint
  ##        2016-06-13 17:43:50 | us-midwest | summer | temperature | 82
  ##        2016-06-13 17:43:50 | us-midwest | summer | humidity | 71
  ##      TABLE "airquality":
  ##        time | location | measure_name | measure_value::bigint
  ##        2016-06-13 17:43:50 | us-west | no2 | 5
  ##        2016-06-13 17:43:50 | us-west | pm25 | 16
  ## In single-table mode:
  ##  - the data will be ingested to a single table, which name will be valueOf(single_table_name)
  ##  - measurement name will stored in dimension named valueOf(single_table_dimension_name_for_telegraf_measurement_name)
  ##  - location and season will be represented as dimensions
  ##  - temperature, humidity, no2, pm25 will be represented as measurement name
  ##  - the Timestream table from the example will look like this:
  ##      Assuming:
  ##        - single_table_name = "my_readings"
  ##        - single_table_dimension_name_for_telegraf_measurement_name = "namespace"
  ##      TABLE "my_readings":
  ##        time | location | season | namespace | measure_name | measure_value::bigint
  ##        2016-06-13 17:43:50 | us-midwest | summer | weather | temperature | 82
  ##        2016-06-13 17:43:50 | us-midwest | summer | weather | humidity | 71
  ##        2016-06-13 17:43:50 | us-west | NULL | airquality | no2 | 5
  ##        2016-06-13 17:43:50 | us-west | NULL | airquality | pm25 | 16
  ## In most cases, using multi-table mapping mode is recommended.
  ## However, you can consider using single-table in situations when you have thousands of measurement names.
  mapping_mode = "multi-table"

  ## Only valid and required for mapping_mode = "single-table"
  ## Specifies the Timestream table where the metrics will be uploaded.
  # single_table_name = "yourTableNameHere"

  ## Only valid and required for mapping_mode = "single-table"
  ## Describes what will be the Timestream dimension name for the Telegraf
  ## measurement name.
  # single_table_dimension_name_for_telegraf_measurement_name = "namespace"

  ## Specifies if the plugin should create the table, if the table do not exist.
  ## The plugin writes the data without prior checking if the table exists.
  ## When the table does not exist, the error returned from Timestream will cause
  ## the plugin to create the table, if this parameter is set to true.
  create_table_if_not_exists = false

  ## Only valid and required if create_table_if_not_exists = true
  ## Specifies the Timestream table magnetic store retention period in days.
  ## Check Timestream documentation for more details.

  # create_table_magnetic_store_retention_period_in_days = 365

  ## Only valid and required if create_table_if_not_exists = true
  ## Specifies the Timestream table memory store retention period in hours.
  ## Check Timestream documentation for more details.
  # create_table_memory_store_retention_period_in_hours = 24

  ## Only valid and optional if create_table_if_not_exists = true
  ## Specifies the Timestream table tags.
  ## Check Timestream documentation for more details
  # create_table_tags = { "foo" = "bar", "environment" = "dev"}

  ## Specify the maximum number of parallel go routines to ingest/write data
  ## If not specified, defaulted to 1 go routines
  max_write_go_routines = 25

###############################################################################
#                            PROCESSOR PLUGINS                                #
###############################################################################

# # Attach AWS EC2 metadata to metrics
# [[processors.aws_ec2]]
#   ## Instance identity document tags to attach to metrics.
#   ## For more information see:
#   ## https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html
#   ##
#   ## Available tags:
#   ## * accountId
#   ## * architecture
#   ## * availabilityZone
#   ## * billingProducts
#   ## * imageId
#   ## * instanceId
#   ## * instanceType
#   ## * kernelId
#   ## * pendingTime
#   ## * privateIp
#   ## * ramdiskId
#   ## * region
#   ## * version
#   imds_tags = []
#
#   ## EC2 instance tags retrieved with DescribeTags action.
#   ## In case tag is empty upon retrieval it's omitted when tagging metrics.
#   ## Note that in order for this to work, role attached to EC2 instance or AWS
#   ## credentials available from the environment must have a policy attached, that
#   ## allows ec2:DescribeTags.
#   ##
#   ## For more information see:
#   ## https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeTags.html
#   ec2_tags = []
#
#   ## Timeout for http requests made by against aws ec2 metadata endpoint.
#   timeout = "10s"
#
#   ## ordered controls whether or not the metrics need to stay in the same order
#   ## this plugin received them in. If false, this plugin will change the order
#   ## with requests hitting cached results moving through immediately and not
#   ## waiting on slower lookups. This may cause issues for you if you are
#   ## depending on the order of metrics staying the same. If so, set this to true.
#   ## Keeping the metrics ordered may be slightly slower.
#   ordered = false
#
#   ## max_parallel_calls is the maximum number of AWS API calls to be in flight
#   ## at the same time.
#   ## It's probably best to keep this number fairly low.
#   max_parallel_calls = 10

# # Clone metrics and apply modifications.
# [[processors.clone]]
#   ## All modifications on inputs and aggregators can be overridden:
#   # name_override = "new_name"
#   # name_prefix = "new_name_prefix"
#   # name_suffix = "new_name_suffix"
#
#   ## Tags to be added (all values must be strings)
#   # [processors.clone.tags]
#   #   additional_tag = "tag_value"

# # Convert values to another metric value type
# [[processors.converter]]
#   ## Tags to convert
#   ##
#   ## The table key determines the target type, and the array of key-values
#   ## select the keys to convert.  The array may contain globs.
#   ##   <target-type> = [<tag-key>...]
#   [processors.converter.tags]
#     measurement = []
#     string = []
#     integer = []
#     unsigned = []
#     boolean = []
#     float = []
#
#   ## Fields to convert
#   ##
#   ## The table key determines the target type, and the array of key-values
#   ## select the keys to convert.  The array may contain globs.
#   ##   <target-type> = [<field-key>...]
#   [processors.converter.fields]
#     measurement = []
#     tag = []
#     string = []
#     integer = []
#     unsigned = []
#     boolean = []
#     float = []

# # Dates measurements, tags, and fields that pass through this filter.
# [[processors.date]]
#   ## New tag to create
#   tag_key = "month"
#
#   ## New field to create (cannot set both field_key and tag_key)
#   # field_key = "month"
#
#   ## Date format string, must be a representation of the Go "reference time"
#   ## which is "Mon Jan 2 15:04:05 -0700 MST 2006".
#   date_format = "Jan"
#
#   ## If destination is a field, date format can also be one of
#   ## "unix", "unix_ms", "unix_us", or "unix_ns", which will insert an integer field.
#   # date_format = "unix"
#
#   ## Offset duration added to the date string when writing the new tag.
#   # date_offset = "0s"
#
#   ## Timezone to use when creating the tag or field using a reference time
#   ## string.  This can be set to one of "UTC", "Local", or to a location name
#   ## in the IANA Time Zone database.
#   ##   example: timezone = "America/Los_Angeles"
#   # timezone = "UTC"

# # Filter metrics with repeating field values
# [[processors.dedup]]
#   ## Maximum time to suppress output
#   dedup_interval = "600s"

# # Defaults sets default value(s) for specified fields that are not set on incoming metrics.
# [[processors.defaults]]
#   ## Ensures a set of fields always exists on your metric(s) with their
#   ## respective default value.
#   ## For any given field pair (key = default), if it's not set, a field
#   ## is set on the metric with the specified default.
#   ##
#   ## A field is considered not set if it is nil on the incoming metric;
#   ## or it is not nil but its value is an empty string or is a string
#   ## of one or more spaces.
#   ##   <target-field> = <value>
#   # [processors.defaults.fields]
#   #   field_1 = "bar"
#   #   time_idle = 0
#   #   is_error = true

# # Map enum values according to given table.
# [[processors.enum]]
#   [[processors.enum.mapping]]
#     ## Name of the field to map. Globs accepted.
#     field = "status"
#
#     ## Name of the tag to map. Globs accepted.
#     # tag = "status"
#
#     ## Destination tag or field to be used for the mapped value.  By default the
#     ## source tag or field is used, overwriting the original value.
#     dest = "status_code"
#
#     ## Default value to be used for all values not contained in the mapping
#     ## table.  When unset, the unmodified value for the field will be used if no
#     ## match is found.
#     # default = 0
#
#     ## Table of mappings
#     [processors.enum.mapping.value_mappings]
#       green = 1
#       amber = 2
#       red = 3

# # Run executable as long-running processor plugin
# [[processors.execd]]
#   ## Program to run as daemon
#   ## eg: command = ["/path/to/your_program", "arg1", "arg2"]
#   command = ["cat"]
#
#   ## Delay before the process is restarted after an unexpected termination
#   restart_delay = "10s"

# # Performs file path manipulations on tags and fields
# [[processors.filepath]]
#   ## Treat the tag value as a path and convert it to its last element, storing the result in a new tag
#   # [[processors.filepath.basename]]
#   #   tag = "path"
#   #   dest = "basepath"
#
#   ## Treat the field value as a path and keep all but the last element of path, typically the path's directory
#   # [[processors.filepath.dirname]]
#   #   field = "path"
#
#   ## Treat the tag value as a path, converting it to its the last element without its suffix
#   # [[processors.filepath.stem]]
#   #   tag = "path"
#
#   ## Treat the tag value as a path, converting it to the shortest path name equivalent
#   ## to path by purely lexical processing
#   # [[processors.filepath.clean]]
#   #   tag = "path"
#
#   ## Treat the tag value as a path, converting it to a relative path that is lexically
#   ## equivalent to the source path when joined to 'base_path'
#   # [[processors.filepath.rel]]
#   #   tag = "path"
#   #   base_path = "/var/log"
#
#   ## Treat the tag value as a path, replacing each separator character in path with a '/' character. Has only
#   ## effect on Windows
#   # [[processors.filepath.toslash]]
#   #   tag = "path"

# # Add a tag of the network interface name looked up over SNMP by interface number
# [[processors.ifname]]
#   ## Name of tag holding the interface number
#   # tag = "ifIndex"
#
#   ## Name of output tag where service name will be added
#   # dest = "ifName"
#
#   ## Name of tag of the SNMP agent to request the interface name from
#   # agent = "agent"
#
#   ## Timeout for each request.
#   # timeout = "5s"
#
#   ## SNMP version; can be 1, 2, or 3.
#   # version = 2
#
#   ## SNMP community string.
#   # community = "public"
#
#   ## Number of retries to attempt.
#   # retries = 3
#
#   ## The GETBULK max-repetitions parameter.
#   # max_repetitions = 10
#
#   ## SNMPv3 authentication and encryption options.
#   ##
#   ## Security Name.
#   # sec_name = "myuser"
#   ## Authentication protocol; one of "MD5", "SHA", or "".
#   # auth_protocol = "MD5"
#   ## Authentication password.
#   # auth_password = "pass"
#   ## Security Level; one of "noAuthNoPriv", "authNoPriv", or "authPriv".
#   # sec_level = "authNoPriv"
#   ## Context Name.
#   # context_name = ""
#   ## Privacy protocol used for encrypted messages; one of "DES", "AES" or "".
#   # priv_protocol = ""
#   ## Privacy password used for encrypted messages.
#   # priv_password = ""
#
#   ## max_parallel_lookups is the maximum number of SNMP requests to
#   ## make at the same time.
#   # max_parallel_lookups = 100
#
#   ## ordered controls whether or not the metrics need to stay in the
#   ## same order this plugin received them in. If false, this plugin
#   ## may change the order when data is cached.  If you need metrics to
#   ## stay in order set this to true.  keeping the metrics ordered may
#   ## be slightly slower
#   # ordered = false
#
#   ## cache_ttl is the amount of time interface names are cached for a
#   ## given agent.  After this period elapses if names are needed they
#   ## will be retrieved again.
#   # cache_ttl = "8h"

# # Apply metric modifications using override semantics.
# [[processors.override]]
#   ## All modifications on inputs and aggregators can be overridden:
#   # name_override = "new_name"
#   # name_prefix = "new_name_prefix"
#   # name_suffix = "new_name_suffix"
#
#   ## Tags to be added (all values must be strings)
#   # [processors.override.tags]
#   #   additional_tag = "tag_value"

# # Parse a value in a specified field/tag(s) and add the result in a new metric
# [[processors.parser]]
#   ## The name of the fields whose value will be parsed.
#   parse_fields = []
#
#   ## If true, incoming metrics are not emitted.
#   drop_original = false
#
#   ## If set to override, emitted metrics will be merged by overriding the
#   ## original metric using the newly parsed metrics.
#   merge = "override"
#
#   ## The dataformat to be read from files
#   ## Each data format has its own unique set of configuration options, read
#   ## more about them here:
#   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
#   data_format = "influx"

# # Rotate a single valued metric into a multi field metric
# [[processors.pivot]]
#   ## Tag to use for naming the new field.
#   tag_key = "name"
#   ## Field to use as the value of the new field.
#   value_key = "value"

# # Given a tag/field of a TCP or UDP port number, add a tag/field of the service name looked up in the system services file
# [[processors.port_name]]
# [[processors.port_name]]
#   ## Name of tag holding the port number
#   # tag = "port"
#   ## Or name of the field holding the port number
#   # field = "port"
#
#   ## Name of output tag or field (depending on the source) where service name will be added
#   # dest = "service"
#
#   ## Default tcp or udp
#   # default_protocol = "tcp"
#
#   ## Tag containing the protocol (tcp or udp, case-insensitive)
#   # protocol_tag = "proto"
#
#   ## Field containing the protocol (tcp or udp, case-insensitive)
#   # protocol_field = "proto"

# # Print all metrics that pass through this filter.
# [[processors.printer]]

# # Transforms tag and field values as well as measurement, tag and field names with regex pattern
# [[processors.regex]]
#   ## Tag and field conversions defined in a separate sub-tables
#   # [[processors.regex.tags]]
#   #   ## Tag to change
#   #   key = "resp_code"
#   #   ## Regular expression to match on a tag value
#   #   pattern = "^(\\d)\\d\\d$"
#   #   ## Matches of the pattern will be replaced with this string.  Use ${1}
#   #   ## notation to use the text of the first submatch.
#   #   replacement = "${1}xx"
#
#   # [[processors.regex.fields]]
#   #   ## Field to change
#   #   key = "request"
#   #   ## All the power of the Go regular expressions available here
#   #   ## For example, named subgroups
#   #   pattern = "^/api(?P<method>/[\\w/]+)\\S*"
#   #   replacement = "${method}"
#   #   ## If result_key is present, a new field will be created
#   #   ## instead of changing existing field
#   #   result_key = "method"
#
#   ## Multiple conversions may be applied for one field sequentially
#   ## Let's extract one more value
#   # [[processors.regex.fields]]
#   #   key = "request"
#   #   pattern = ".*category=(\\w+).*"
#   #   replacement = "${1}"
#   #   result_key = "search_category"
#
#   ## Rename metric fields
#   # [[processors.regex.field_rename]]
#   #   ## Regular expression to match on a field name
#   #   pattern = "^search_(\\w+)d$"
#   #   ## Matches of the pattern will be replaced with this string.  Use ${1}
#   #   ## notation to use the text of the first submatch.
#   #   replacement = "${1}"
#   #   ## If the new field name already exists, you can either "overwrite" the
#   #   ## existing one with the value of the renamed field OR you can "keep"
#   #   ## both the existing and source field.
#   #   # result_key = "keep"
#
#   ## Rename metric tags
#   # [[processors.regex.tag_rename]]
#   #   ## Regular expression to match on a tag name
#   #   pattern = "^search_(\\w+)d$"
#   #   ## Matches of the pattern will be replaced with this string.  Use ${1}
#   #   ## notation to use the text of the first submatch.
#   #   replacement = "${1}"
#   #   ## If the new tag name already exists, you can either "overwrite" the
#   #   ## existing one with the value of the renamed tag OR you can "keep"
#   #   ## both the existing and source tag.
#   #   # result_key = "keep"
#
#   ## Rename metrics
#   # [[processors.regex.metric_rename]]
#   #   ## Regular expression to match on an metric name
#   #   pattern = "^search_(\\w+)d$"
#   #   ## Matches of the pattern will be replaced with this string.  Use ${1}
#   #   ## notation to use the text of the first submatch.
#   #   replacement = "${1}"

# # Rename measurements, tags, and fields that pass through this filter.
# [[processors.rename]]

# # ReverseDNS does a reverse lookup on IP addresses to retrieve the DNS name
# [[processors.reverse_dns]]
#   ## For optimal performance, you may want to limit which metrics are passed to this
#   ## processor. eg:
#   ## namepass = ["my_metric_*"]
#
#   ## cache_ttl is how long the dns entries should stay cached for.
#   ## generally longer is better, but if you expect a large number of diverse lookups
#   ## you'll want to consider memory use.
#   cache_ttl = "24h"
#
#   ## lookup_timeout is how long should you wait for a single dns request to repsond.
#   ## this is also the maximum acceptable latency for a metric travelling through
#   ## the reverse_dns processor. After lookup_timeout is exceeded, a metric will
#   ## be passed on unaltered.
#   ## multiple simultaneous resolution requests for the same IP will only make a
#   ## single rDNS request, and they will all wait for the answer for this long.
#   lookup_timeout = "3s"
#
#   ## max_parallel_lookups is the maximum number of dns requests to be in flight
#   ## at the same time. Requesting hitting cached values do not count against this
#   ## total, and neither do mulptiple requests for the same IP.
#   ## It's probably best to keep this number fairly low.
#   max_parallel_lookups = 10
#
#   ## ordered controls whether or not the metrics need to stay in the same order
#   ## this plugin received them in. If false, this plugin will change the order
#   ## with requests hitting cached results moving through immediately and not
#   ## waiting on slower lookups. This may cause issues for you if you are
#   ## depending on the order of metrics staying the same. If so, set this to true.
#   ## keeping the metrics ordered may be slightly slower.
#   ordered = false
#
#   [[processors.reverse_dns.lookup]]
#     ## get the ip from the field "source_ip", and put the result in the field "source_name"
#     field = "source_ip"
#     dest = "source_name"
#
#   [[processors.reverse_dns.lookup]]
#     ## get the ip from the tag "destination_ip", and put the result in the tag
#     ## "destination_name".
#     tag = "destination_ip"
#     dest = "destination_name"
#
#     ## If you would prefer destination_name to be a field instead, you can use a
#     ## processors.converter after this one, specifying the order attribute.

# # Add the S2 Cell ID as a tag based on latitude and longitude fields
# [[processors.s2geo]]
#   ## The name of the lat and lon fields containing WGS-84 latitude and
#   ## longitude in decimal degrees.
#   # lat_field = "lat"
#   # lon_field = "lon"
#
#   ## New tag to create
#   # tag_key = "s2_cell_id"
#
#   ## Cell level (see https://s2geometry.io/resources/s2cell_statistics.html)
#   # cell_level = 9

# # Process metrics using a Starlark script
# [[processors.starlark]]
#   ## The Starlark source can be set as a string in this configuration file, or
#   ## by referencing a file containing the script.  Only one source or script
#   ## should be set at once.
#   ##
#   ## Source of the Starlark script.
#   source = '''
# def apply(metric):
#   return metric
# '''
#
#   ## File containing a Starlark script.
#   # script = "/usr/local/bin/myscript.star"
#
#   ## The constants of the Starlark script.
#   # [processors.starlark.constants]
#   #   max_size = 10
#   #   threshold = 0.75
#   #   default_name = "Julia"
#   #   debug_mode = true

# # Perform string processing on tags, fields, and measurements
# [[processors.strings]]
#   ## Convert a tag value to uppercase
#   # [[processors.strings.uppercase]]
#   #   tag = "method"
#
#   ## Convert a field value to lowercase and store in a new field
#   # [[processors.strings.lowercase]]
#   #   field = "uri_stem"
#   #   dest = "uri_stem_normalised"
#
#   ## Convert a field value to titlecase
#   # [[processors.strings.titlecase]]
#   #   field = "status"
#
#   ## Trim leading and trailing whitespace using the default cutset
#   # [[processors.strings.trim]]
#   #   field = "message"
#
#   ## Trim leading characters in cutset
#   # [[processors.strings.trim_left]]
#   #   field = "message"
#   #   cutset = "\t"
#
#   ## Trim trailing characters in cutset
#   # [[processors.strings.trim_right]]
#   #   field = "message"
#   #   cutset = "\r\n"
#
#   ## Trim the given prefix from the field
#   # [[processors.strings.trim_prefix]]
#   #   field = "my_value"
#   #   prefix = "my_"
#
#   ## Trim the given suffix from the field
#   # [[processors.strings.trim_suffix]]
#   #   field = "read_count"
#   #   suffix = "_count"
#
#   ## Replace all non-overlapping instances of old with new
#   # [[processors.strings.replace]]
#   #   measurement = "*"
#   #   old = ":"
#   #   new = "_"
#
#   ## Trims strings based on width
#   # [[processors.strings.left]]
#   #   field = "message"
#   #   width = 10
#
#   ## Decode a base64 encoded utf-8 string
#   # [[processors.strings.base64decode]]
#   #   field = "message"
#
#   ## Sanitize a string to ensure it is a valid utf-8 string
#   ## Each run of invalid UTF-8 byte sequences is replaced by the replacement string, which may be empty
#   # [[processors.strings.valid_utf8]]
#   #   field = "message"
#   #   replacement = ""

# # Restricts the number of tags that can pass through this filter and chooses which tags to preserve when over the limit.
# [[processors.tag_limit]]
#   ## Maximum number of tags to preserve
#   limit = 10
#
#   ## List of tags to preferentially preserve
#   keep = ["foo", "bar", "baz"]

# # Uses a Go template to create a new tag
# [[processors.template]]
#   ## Tag to set with the output of the template.
#   tag = "topic"
#
#   ## Go template used to create the tag value.  In order to ease TOML
#   ## escaping requirements, you may wish to use single quotes around the
#   ## template string.
#   template = '{{ .Tag "hostname" }}.{{ .Tag "level" }}'

# # Print all metrics that pass through this filter.
# [[processors.topk]]
#   ## How many seconds between aggregations
#   # period = 10
#
#   ## How many top metrics to return
#   # k = 10
#
#   ## Over which tags should the aggregation be done. Globs can be specified, in
#   ## which case any tag matching the glob will aggregated over. If set to an
#   ## empty list is no aggregation over tags is done
#   # group_by = ['*']
#
#   ## Over which fields are the top k are calculated
#   # fields = ["value"]
#
#   ## What aggregation to use. Options: sum, mean, min, max
#   # aggregation = "mean"
#
#   ## Instead of the top k largest metrics, return the bottom k lowest metrics
#   # bottomk = false
#
#   ## The plugin assigns each metric a GroupBy tag generated from its name and
#   ## tags. If this setting is different than "" the plugin will add a
#   ## tag (which name will be the value of this setting) to each metric with
#   ## the value of the calculated GroupBy tag. Useful for debugging
#   # add_groupby_tag = ""
#
#   ## These settings provide a way to know the position of each metric in
#   ## the top k. The 'add_rank_field' setting allows to specify for which
#   ## fields the position is required. If the list is non empty, then a field
#   ## will be added to each and every metric for each string present in this
#   ## setting. This field will contain the ranking of the group that
#   ## the metric belonged to when aggregated over that field.
#   ## The name of the field will be set to the name of the aggregation field,
#   ## suffixed with the string '_topk_rank'
#   # add_rank_fields = []
#
#   ## These settings provide a way to know what values the plugin is generating
#   ## when aggregating metrics. The 'add_aggregate_field' setting allows to
#   ## specify for which fields the final aggregation value is required. If the
#   ## list is non empty, then a field will be added to each every metric for
#   ## each field present in this setting. This field will contain
#   ## the computed aggregation for the group that the metric belonged to when
#   ## aggregated over that field.
#   ## The name of the field will be set to the name of the aggregation field,
#   ## suffixed with the string '_topk_aggregate'
#   # add_aggregate_fields = []

# # Rotate multi field metric into several single field metrics
# [[processors.unpivot]]
#   ## Tag to use for the name.
#   tag_key = "name"
#   ## Field to use for the name of the value.
#   value_key = "value"

###############################################################################
#                            AGGREGATOR PLUGINS                               #
###############################################################################

# # Keep the aggregate basicstats of each metric passing through.
# [[aggregators.basicstats]]
#   ## The period on which to flush & clear the aggregator.
#   period = "30s"
#
#   ## If true, the original metric will be dropped by the
#   ## aggregator and will not get sent to the output plugins.
#   drop_original = false
#
#   ## Configures which basic stats to push as fields
#   # stats = ["count", "min", "max", "mean", "stdev", "s2", "sum"]

# # Calculates a derivative for every field.
# [[aggregators.derivative]]
#   ## The period in which to flush the aggregator.
#   period = "30s"
#   ##
#   ## If true, the original metric will be dropped by the
#   ## aggregator and will not get sent to the output plugins.
#   drop_original = false
#   ##
#   ## This aggregator will estimate a derivative for each field, which is
#   ## contained in both the first and last metric of the aggregation interval.
#   ## Without further configuration the derivative will be calculated with
#   ## respect to the time difference between these two measurements in seconds.
#   ## The formula applied is for every field:
#   ##
#   ##               value_last - value_first
#   ## derivative = --------------------------
#   ##              time_difference_in_seconds
#   ##
#   ## The resulting derivative will be named *fieldname_rate*. The suffix
#   ## "_rate" can be configured by the *suffix* parameter. When using a
#   ## derivation variable you can include its name for more clarity.
#   # suffix = "_rate"
#   ##
#   ## As an abstraction the derivative can be calculated not only by the time
#   ## difference but by the difference of a field, which is contained in the
#   ## measurement. This field is assumed to be monotonously increasing. This
#   ## feature is used by specifying a *variable*.
#   ## Make sure the specified variable is not filtered and exists in the metrics
#   ## passed to this aggregator!
#   # variable = ""
#   ##
#   ## When using a field as the derivation parameter the name of that field will
#   ## be used for the resulting derivative, e.g. *fieldname_by_parameter*.
#   ##
#   ## Note, that the calculation is based on the actual timestamp of the
#   ## measurements. When there is only one measurement during that period, the
#   ## measurement will be rolled over to the next period. The maximum number of
#   ## such roll-overs can be configured with a default of 10.
#   # max_roll_over = 10
#   ##

# # Report the final metric of a series
# [[aggregators.final]]
#   ## The period on which to flush & clear the aggregator.
#   period = "30s"
#   ## If true, the original metric will be dropped by the
#   ## aggregator and will not get sent to the output plugins.
#   drop_original = false
#
#   ## The time that a series is not updated until considering it final.
#   series_timeout = "5m"

# # Create aggregate histograms.
# [[aggregators.histogram]]
#   ## The period in which to flush the aggregator.
#   period = "30s"
#
#   ## If true, the original metric will be dropped by the
#   ## aggregator and will not get sent to the output plugins.
#   drop_original = false
#
#   ## If true, the histogram will be reset on flush instead
#   ## of accumulating the results.
#   reset = false
#
#   ## Whether bucket values should be accumulated. If set to false, "gt" tag will be added.
#   ## Defaults to true.
#   cumulative = true
#
#   ## Example config that aggregates all fields of the metric.
#   # [[aggregators.histogram.config]]
#   #   ## Right borders of buckets (with +Inf implicitly added).
#   #   buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0]
#   #   ## The name of metric.
#   #   measurement_name = "cpu"
#
#   ## Example config that aggregates only specific fields of the metric.
#   # [[aggregators.histogram.config]]
#   #   ## Right borders of buckets (with +Inf implicitly added).
#   #   buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
#   #   ## The name of metric.
#   #   measurement_name = "diskio"
#   #   ## The concrete fields of metric
#   #   fields = ["io_time", "read_time", "write_time"]

# # Merge metrics into multifield metrics by series key
# [[aggregators.merge]]
#   ## If true, the original metric will be dropped by the
#   ## aggregator and will not get sent to the output plugins.
#   drop_original = true

# # Keep the aggregate min/max of each metric passing through.
# [[aggregators.minmax]]
#   ## General Aggregator Arguments:
#   ## The period on which to flush & clear the aggregator.
#   period = "30s"
#   ## If true, the original metric will be dropped by the
#   ## aggregator and will not get sent to the output plugins.
#   drop_original = false

# # Keep the aggregate quantiles of each metric passing through.
# [[aggregators.quantile]]
#   ## General Aggregator Arguments:
#   ## The period on which to flush & clear the aggregator.
#   period = "30s"
#
#   ## If true, the original metric will be dropped by the
#   ## aggregator and will not get sent to the output plugins.
#   drop_original = false
#
#   ## Quantiles to output in the range [0,1]
#   # quantiles = [0.25, 0.5, 0.75]
#
#   ## Type of aggregation algorithm
#   ## Supported are:
#   ##  "t-digest" -- approximation using centroids, can cope with large number of samples
#   ##  "exact R7" -- exact computation also used by Excel or NumPy (Hyndman & Fan 1996 R7)
#   ##  "exact R8" -- exact computation (Hyndman & Fan 1996 R8)
#   ## NOTE: Do not use "exact" algorithms with large number of samples
#   ##       to not impair performance or memory consumption!
#   # algorithm = "t-digest"
#
#   ## Compression for approximation (t-digest). The value needs to be
#   ## greater or equal to 1.0. Smaller values will result in more
#   ## performance but less accuracy.
#   # compression = 100.0

# # Aggregate metrics using a Starlark script
# [[aggregators.starlark]]
#   ## The Starlark source can be set as a string in this configuration file, or
#   ## by referencing a file containing the script.  Only one source or script
#   ## should be set at once.
#   ##
#   ## Source of the Starlark script.
#   source = '''
# state = {}
#
# def add(metric):
#   state["last"] = metric
#
# def push():
#   return state.get("last")
#
# def reset():
#   state.clear()
# '''
#
#   ## File containing a Starlark script.
#   # script = "/usr/local/bin/myscript.star"
#
#   ## The constants of the Starlark script.
#   # [aggregators.starlark.constants]
#   #   max_size = 10
#   #   threshold = 0.75
#   #   default_name = "Julia"
#   #   debug_mode = true

# # Count the occurrence of values in fields.
# [[aggregators.valuecounter]]
#   ## General Aggregator Arguments:
#   ## The period on which to flush & clear the aggregator.
#   period = "30s"
#   ## If true, the original metric will be dropped by the
#   ## aggregator and will not get sent to the output plugins.
#   drop_original = false
#   ## The fields for which the values will be counted
#   fields = []

###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################

# Retrieve data from OPCUA devices
[[inputs.opcua]]

  # Metric name
  name = "opcua_read"
  #
  ## OPC UA Endpoint URL
  endpoint = "opc.tcp://localhost:4840"
  #
  ## Maximum time allowed to establish a connect to the endpoint.
  connect_timeout = "10s"
  #
  ## Maximum time allowed for a request over the estabilished connection.
  request_timeout = "5s"
  #
  ## Security policy, one of "None", "Basic128Rsa15", "Basic256",
  ## "Basic256Sha256", or "auto"
  # security_policy = "auto"
  #
  ## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto"
  # security_mode = "auto"
  #
  ## Path to cert.pem. Required when security mode or policy isn't "None".
  ## If cert path is not supplied, self-signed cert and key will be generated.
  # certificate = "/etc/telegraf/cert.pem"
  #
  ## Path to private key.pem. Required when security mode or policy isn't "None".
  ## If key path is not supplied, self-signed cert and key will be generated.
  # private_key = "/etc/telegraf/key.pem"
  #
  ## Authentication Method, one of "Certificate", "UserName", or "Anonymous".  To
  ## authenticate using a specific ID, select 'Certificate' or 'UserName'
  # auth_method = "Anonymous"
  #
  ## Username. Required for auth_method = "UserName"
  # username = ""
  #
  ## Password. Required for auth_method = "UserName"
  # password = ""
  #
  ## Option to select the metric timestamp to use. Valid options are:
  ##     "gather" -- uses the time of receiving the data in telegraf
  ##     "server" -- uses the timestamp provided by the server
  ##     "source" -- uses the timestamp provided by the source
  # timestamp = "gather"
  #
  ## Node ID configuration
  ## name              - field name to use in the output
  ## namespace         - OPC UA namespace of the node (integer value 0 thru 3)
  ## identifier_type   - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque)
  ## identifier        - OPC UA ID (tag as shown in opcua browser)
  ## Example:
  ## {name="ProductUri", namespace="0", identifier_type="i", identifier="2262"}
  nodes = [
   {name="random0", namespace="2", identifier_type="i", identifier="200010"},
   {name="random1", namespace="2", identifier_type="i", identifier="200011"},
  ]
  #
  ## Node Group
  ## Sets defaults for OPC UA namespace and ID type so they aren't required in
  ## every node.  A group can also have a metric name that overrides the main
  ## plugin metric name.
  ##
  ## Multiple node groups are allowed
  #[[inputs.opcua.group]]
  ## Group Metric name. Overrides the top level name.  If unset, the
  ## top level name is used.
  # name =
  #
  ## Group default namespace. If a node in the group doesn't set its
  ## namespace, this is used.
  # namespace = "2"
  #
  ## Group default identifier type. If a node in the group doesn't set its
  ## namespace, this is used.
  # identifier_type =
  #
  ## Node ID Configuration.  Array of nodes with the same settings as above.
  # nodes = [
  #  {name="", namespace="", identifier_type="", identifier=""},
  #  {name="", namespace="", identifier_type="", identifier=""},
  #]

Logs from Telegraf

2022-02-22T17:18:32Z I! Starting Telegraf 1.21.4 2022-02-22T17:18:32Z I! Using config file: /home/jonat/.telegraf/telegraf.conf 2022-02-22T17:18:32Z I! Loaded inputs: opcua 2022-02-22T17:18:32Z I! Loaded aggregators: 2022-02-22T17:18:32Z I! Loaded processors: 2022-02-22T17:18:32Z I! Loaded outputs: timestream 2022-02-22T17:18:32Z I! Tags enabled: host=jonat-Inspiron-7460 2022-02-22T17:18:32Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"jonat-Inspiron-7460", Flush Interval:10s 2022-02-22T17:18:32Z D! [agent] Initializing plugins 2022-02-22T17:18:32Z W! [inputs.opcua] Failed to load certificate: open /etc/telegraf/cert.pem: no such file or directory 2022-02-22T17:18:32Z D! [agent] Connecting outputs 2022-02-22T17:18:32Z D! [agent] Attempting connection to [outputs.timestream] 2022-02-22T17:18:32Z I! [outputs.timestream] Constructing Timestream client for 'multi-table' mode 2022-02-22T17:18:32Z D! [agent] Successfully connected to outputs.timestream 2022-02-22T17:18:32Z D! [agent] Starting service inputs 2022-02-22T17:18:42Z D! [outputs.timestream] Writing to Timestream: '&{0xc000d2c520 [{[{0xc000d2c480 0xc000d2c490 {}} {0xc000d2c4a0 0xc000d2c4b0 {}}] 0xc000d2c4c0 0xc000d2c4d0 DOUBLE 0xc000d2c4e0 SECONDS 0 {}} {[{0xc000d2c480 0xc000d2c490 {}} {0xc000d2c4a0 0xc000d2c4b0 {}}] 0xc000d2c4f0 0xc000d2c500 VARCHAR 0xc000d2c510 SECONDS 0 {}} {[{0xc000d2c540 0xc000d2c550 {}} {0xc000d2c560 0xc000d2c570 {}}] 0xc000d2c580 0xc000d2c590 DOUBLE 0xc000d2c5a0 SECONDS 0 {}} {[{0xc000d2c540 0xc000d2c550 {}} {0xc000d2c560 0xc000d2c570 {}}] 0xc000d2c5b0 0xc000d2c5c0 VARCHAR 0xc000d2c5d0 SECONDS 0 {}}] 0xc000d2c530 0xc00047e8a0 {}}' with ResourceNotFoundRetry: 'true' 2022-02-22T17:18:44Z E! [outputs.timestream] Failed to write to Timestream database 'HeronDB' table 'opcua_read'. Skipping metric! Error: 'operation error Timestream Write: WriteRecords, https response error StatusCode: 400, RequestID: 626FPAG5WFKZIC4YDWZY4N3AXM, ValidationException: ' 2022-02-22T17:18:44Z I! [outputs.timestream] ##WriteToTimestream - Metrics size: 2 request size: 1 time(ms): 2473 2022-02-22T17:18:44Z D! [outputs.timestream] Wrote batch of 2 metrics in 2.474076834s 2022-02-22T17:18:44Z D! [outputs.timestream] Buffer fullness: 0 / 10000 metrics

System info

Telegraf 1.21.4, Ubuntu 20.04.3 LTS

Docker

No response

Steps to reproduce

  1. Run Telegraf using config file

Expected behavior

Write data to a table in a Timestream DB.

Actual behavior

I get a incomplete error message: 2022-02-22T17:18:44Z E! [outputs.timestream] Failed to write to Timestream database 'HeronDB' table 'opcua_read'. Skipping metric! Error: 'operation error Timestream Write: WriteRecords, https response error StatusCode: 400, RequestID: 626FPAG5WFKZIC4YDWZY4N3AXM, ValidationException: '

Additional info

I'm using opcua input to collect data from an OUC-UA server. The connection is successful and I can see that the data has been collected

opcua_read,host=jonat-Inspiron-7460 random0=-0.028437482193112373,Quality="OK (0x0)" 1645549130000000000
opcua_read,host=jonat-Inspiron-7460 random1=0.5645155310630798,Quality="OK (0x0)" 1645549130000000000

I'm trying to use timestream output to save this data to a Timestream database but I'm getting the following error where I can't see the validation exception:

2022-02-22T17:18:44Z E! [outputs.timestream] Failed to write to Timestream database 'HeronDB' table 'opcua_read'. Skipping metric! Error: 'operation error Timestream Write: WriteRecords, https response error StatusCode: 400, RequestID: 626FPAG5WFKZIC4YDWZY4N3AXM, ValidationException: '

I was able to save the input.cpu data to the Timestream following the example on the aws website.

Could someone help me with this issue?

telegraf-tiger[bot] commented 2 years ago

Hello! I recommend posting this question in our Community Slack or Community Page, we have a lot of talented community members there who could help answer your question more quickly. Heads up, this issue will be automatically closed after 7 days of inactivity. Thank you!

telegraf-tiger[bot] commented 2 years ago

Hello! I am closing this issue due to inactivity. I hope you were able to resolve your problem, if not please try posting this question in our Community Slack or Community Page. Thank you!