influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.59k stars 5.56k forks source link

telegraf not recovering after mosquitto restart. #6232

Closed vincems closed 5 years ago

vincems commented 5 years ago

Relevant telegraf.conf:

[global_tags]
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  debug = true
  quiet = false
  logfile = "/var/log/telegraf/telegraf.log"
  hostname = ""
  omit_hostname = false
[[outputs.influxdb]]
  urls = ["http://127.0.0.1:8086"]
  database = "dbname"
  username = "dbuser"
  password = "dbpass"
  user_agent = "telegraf"

[[inputs.mqtt_consumer]]
   servers = ["tcp://myhost:1883"]
   qos = 0
   connection_timeout = "30s"
   topics = [ "topic1" ]
   persistent_session = true
   client_id = "telegraf_things"
   username = "mqttuser"
   password = "mqttpass"
   data_format = "json"

System info:

mosquitto version 1.4.15 (build date 2018-07-24 13:34:50+1200) InfluxDB v1.7.7 (git: 1.7 f8fdf652f348fc9980997fe1c972e2b79ddd13b0) Telegraf 1.11.4 (git: HEAD d9ca76e4)

maybe not relevant Telegraf 1.11.4 (git: HEAD d9ca76e4)

Steps to reproduce:

  1. Start mosquitto
  2. Start influxd
  3. Start telegraf
  4. Publish some data to the mosquitto topic
  5. Restart mosquitto
  6. Publish more data to the mosquitto topic

Expected behavior:

Watching the mosquitto log you can see the initial start up sequence:

1565310635: Sending CONNACK to telegraf_things (1, 0) 1565310635: Received SUBSCRIBE from telegraf_things 1565310635: Sending SUBACK to telegraf_things 1565310637: Sending PUBLISH to telegraf_things (d0, q0, r0, m0, 'topic1', ... (32 bytes)) ... restart mosquitto ... 1565310691: Sending CONNACK to telegraf_things (0, 0) 1565310635: Received SUBSCRIBE from telegraf_things 1565310635: Sending SUBACK to telegraf_things 1565310637: Sending PUBLISH to telegraf_things (d0, q0, r0, m0, 'topic1', ... (32 bytes))

Actual behavior:

1565310635: Sending CONNACK to telegraf_things (1, 0) 1565310635: Received SUBSCRIBE from telegraf_things 1565310635: Sending SUBACK to telegraf_things 1565310637: Sending PUBLISH to telegraf_things (d0, q0, r0, m0, 'topic1', ... (32 bytes)) ... restart mosquitto ... 1565310691: Sending CONNACK to telegraf_things (0, 0) 1565310751: Received PINGREQ from telegraf_things 1565310751: Sending PINGRESP to telegraf_things

Additional info:

If you wait long enough you will see the PINGs coming through but never see any subscription or published data.

excerpt from the telegraf log after restarting mosquitto (timestamps removed for clarity's sake).

E! [inputs.mqtt_consumer]: Error in plugin: connection lost: EOF D! [inputs.mqtt_consumer] Disconnected [tcp://myhost:1883] D! [outputs.influxdb] buffer fullness: 0 / 10000 metrics. D! [inputs.mqtt_consumer] Connecting [tcp://myhost:1883] I! [inputs.mqtt_consumer] Connected [tcp://myhost:1883] D! [outputs.influxdb] buffer fullness: 0 / 10000 metrics.

danielnelson commented 5 years ago

Took a quick peek at the code, does disabling persistent session help?

danielnelson commented 5 years ago

If so I think we just need to make use of the enhancement mentioned here in the paho mqtt library. https://github.com/eclipse/paho.mqtt.golang/issues/240

vincems commented 5 years ago

Indeed it does. Good work around.

vincems commented 5 years ago

Despite being far from an adept go developer - and pretty new to this code, I've been doing more digging into this. It appears that what we're seeing here is an intentional design decision made by the paho.mqtt client developers.

See post https://github.com/eclipse/paho.mqtt.c/issues/221

In which case I believe the approach taken in the application code mqtt_consumer.go lines 178-193 needs to be looked at. If the paho library doesn't re-subscribe automatically then it's up to the client application to do so.

Most of the work is already done... just need to flag the loss of the session, then re-subscribe when re-established and bob's your antie.

vincems commented 5 years ago

unless of course said enhancement happens.