influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.18k stars 5.52k forks source link

Amqp consumer connection fail #13746

Closed matteoberts closed 3 months ago

matteoberts commented 11 months ago

Relevant telegraf.conf

...
###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################

[[inputs.amqp_consumer]]
  ## Broker to consume from.
  ##   deprecated in 1.7; use the brokers option
  # url = "amqp://localhost:5672/influxdb"

  ## Brokers to consume from.  If multiple brokers are specified a random broker
  ## will be selected anytime a connection is established.  This can be
  ## helpful for load balancing when not using a dedicated load balancer.
  brokers = ["amqp://localhost:5672"]

  ## Authentication credentials for the PLAIN auth_method.
  # username = ""
  # password = ""

  ## Name of the exchange to declare.  If unset, no exchange will be declared.
  exchange = "telegraf"

  ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
  # exchange_type = "topic"

  ## If true, exchange will be passively declared.
  # exchange_passive = false

  ## Exchange durability can be either "transient" or "durable".
  # exchange_durability = "durable"

  ## Additional exchange arguments.
  # exchange_arguments = { }
  # exchange_arguments = {"hash_property" = "timestamp"}

  ## AMQP queue name.
  queue = "sensorsmq"

  ## AMQP queue durability can be "transient" or "durable".
  queue_durability = "durable"

  ## If true, queue will be passively declared.
  # queue_passive = false

  ## A binding between the exchange and queue using this binding key is
  ## created.  If unset, no binding is created.
  binding_key = "#"

  ## Maximum number of messages server should give to the worker.
  prefetch_count = 1000

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Auth method. PLAIN and EXTERNAL are supported
  ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
  ## described here: https://www.rabbitmq.com/plugins.html
  # auth_method = "PLAIN"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Content encoding for message payloads, can be set to "gzip" to or
  ## "identity" to apply no encoding.
  # content_encoding = "identity"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"
...

Logs from Telegraf

2023-08-09T20:53:22Z D! [agent] Starting service inputs
2023-08-09T20:53:22Z D! [inputs.amqp_consumer] Connecting to "amqp://localhost:5672"
2023-08-09T20:53:23Z D! [inputs.amqp_consumer] Error connecting to "amqp://localhost:5672"

System info

Telegraf 1.26.3, Windows Server 2019

Docker

No response

Steps to reproduce

  1. System reboot
  2. Telegraf start faster than RabbitMQ
  3. Telegraf can't connect to RabbitMQ -> error logs: _2023-08-09T20:53:22Z D! [agent] Starting service inputs 2023-08-09T20:53:22Z D! [inputs.amqp_consumer] Connecting to "amqp://localhost:5672" 2023-08-09T20:53:23Z D! [inputs.amqpconsumer] Error connecting to "amqp://localhost:5672"
  4. RabbitMQ started meanwhile
  5. Telegraf does not retry to connect, messages are not processed ...

Expected behavior

In case of Telegraf fails to connect to AMQP broker, it should perform some retries

Actual behavior

No connection retry is performed based on the error described. A manual restart of Telegraf has to be performed in order to re-establish the connection

Additional info

I can't find a configuration for connection retries/backoff based on described error scenario. In case of Telegraf can't establish a connection to RabbitMQ via the input amqp_consumer plugin there should be a way to perform retries.

powersj commented 11 months ago

Hi,

Happy to see a PR which adds a config option added to the plugin that enables retries, such as:

  ## Specifies plugin behavior regarding disconnected servers
  ## Available choices :
  ##   - error: telegraf will return an error on startup if one the servers is unreachable
  ##   - ignore: telegraf will ignore unreachable servers on both startup and gather
  # disconnected_servers_behavior = "error"
matteoberts commented 11 months ago

Hi @powersj,

Thanks a lot for the feedback! I'm glad to see the addition of the new config option to the plugin, it's a great enhancement.

I had a question regarding the PR: will there be a connection retry mechanism included to handle the scenario we discussed? Or is there a different approach planned for handling it? I'm curious to know how this will be addressed.

Looking forward to hearing more about it. Thanks again for your work on this!

Regards, Matteo

powersj commented 11 months ago

Yes - whoever adds this could add another option like "retry" where the plugin could retry the connection at each interval.

srebhan commented 3 months ago

@matteoberts could you please try the binary in PR #15145 with startup_error_behavior = "retry" and let me know if this is the behavior you want!?!?

matteoberts commented 2 weeks ago

@matteoberts could you please try the binary in PR #15145 with startup_error_behavior = "retry" and let me know if this is the behavior you want!?!?

Hi @srebhan ,

Sorry for the late reply. I've tested it and it worked pretty well. Thank you very much for the support!