influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.6k stars 5.57k forks source link

MQTT consumer plugin connected to VerneMQ but no subscriptions created #13338

Closed jcoliz closed 1 year ago

jcoliz commented 1 year ago

Relevant telegraf.conf

EDIT: Reducing the complexity of this configuration for a simpler repro

[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Log at debug level.
  debug = true
  ## Log only error level messages.
  quiet = false

[[inputs.mqtt_consumer]]
  servers = [ "tcp://mqtt:1883" ]
  topics = [ "spB1.0/testing/#" ]
  persistent_session = true
  qos = 1
  client_id = "telegraf"
  data_format = "value"
  data_type = "string"

[[outputs.influxdb_v2]]
  urls = ["http://influxdb:8086"]
  token = "12345678"
  organization = "brewhub"
  bucket = "dockerism"

[[outputs.file]]
  files = ["stdout"]

Logs from Telegraf

telegraf  | 2023-05-25T18:07:22Z I! Loading config: /etc/telegraf/telegraf.conf
telegraf  | 2023-05-25T18:07:22Z I! Starting Telegraf 1.26.3
telegraf  | 2023-05-25T18:07:22Z I! Available plugins: 235 inputs, 9 aggregators, 27 processors, 22 parsers, 57 outputs, 2 secret-stores
telegraf  | 2023-05-25T18:07:22Z I! Loaded inputs: mqtt_consumer
telegraf  | 2023-05-25T18:07:22Z I! Loaded aggregators:
telegraf  | 2023-05-25T18:07:22Z I! Loaded processors:
telegraf  | 2023-05-25T18:07:22Z I! Loaded secretstores:
telegraf  | 2023-05-25T18:07:22Z I! Loaded outputs: file influxdb_v2
telegraf  | 2023-05-25T18:07:22Z I! Tags enabled: host=912a47b36da1
telegraf  | 2023-05-25T18:07:22Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"912a47b36da1", Flush Interval:10s
telegraf  | 2023-05-25T18:07:22Z D! [agent] Initializing plugins
telegraf  | 2023-05-25T18:07:22Z W! [inputs.mqtt_consumer] Server "mqtt:1883" should be updated to use `scheme://host:port` format
telegraf  | 2023-05-25T18:07:22Z D! [agent] Connecting outputs
telegraf  | 2023-05-25T18:07:22Z D! [agent] Attempting connection to [outputs.influxdb_v2]
telegraf  | 2023-05-25T18:07:22Z D! [agent] Successfully connected to outputs.influxdb_v2
telegraf  | 2023-05-25T18:07:22Z D! [agent] Attempting connection to [outputs.file]
telegraf  | 2023-05-25T18:07:22Z D! [agent] Successfully connected to outputs.file
telegraf  | 2023-05-25T18:07:22Z D! [agent] Starting service inputs
telegraf  | 2023-05-25T18:07:22Z I! [inputs.mqtt_consumer] Connected [mqtt:1883]
telegraf  | 2023-05-25T18:07:22Z D! [inputs.mqtt_consumer] Session found [mqtt:1883]
telegraf  | 2023-05-25T18:07:32Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
telegraf  | 2023-05-25T18:07:32Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
telegraf  | 2023-05-25T18:07:42Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics

System info

Telegraf 1.26.3, Docker version 20.10.21, build baeda1f, Ubuntu 20.04.5 LTS (on WSL)

Docker

version: '3.6'
services:
  mqtt:
    image: vernemq/vernemq:1.12.6.2
    container_name: vernemq
    restart: always
    ports:
      - '1883:1883'
    environment:
      - DOCKER_VERNEMQ_ACCEPT_EULA=yes
      - DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on
    volumes:
      - vernemq_data:/var/lib/vernemq
  telegraf:
    image: telegraf:1.26.3-alpine
    container_name: telegraf
    restart: always
    volumes:
    - ./telegraf.conf:/etc/telegraf/telegraf.conf:ro
    depends_on:
      - influxdb
      - mqtt
    links:
      - influxdb
    ports:
    - '8125:8125'
  influxdb:
    image: influxdb:2.7.1-alpine
    container_name: influxdb
    restart: always
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=brewhub
      - DOCKER_INFLUXDB_INIT_PASSWORD=brewhub
      - DOCKER_INFLUXDB_INIT_ORG=brewhub
      - DOCKER_INFLUXDB_INIT_BUCKET=dockerism
      - DOCKER_INFLUXDB_INIT_RETENTION=365d
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=12345678
    ports:
      - '8086:8086'
    volumes:
      - influxdb_data:/var/lib/influxdb2
volumes:
  influxdb_data: {}
  vernemq_data: {}

Steps to reproduce

  1. docker compose up
  2. Send MQTT messages from local machine (not in docker network)
  3. Observe VerneMQ sessions (connections and subscriptions)

Expected behavior

Expected that a subscription to the specific topics is entered

For example, let's say I subscribe using another client. In this case I'll have my sender ALSO subscribe. This will show what it should look like:

$ vmq-admin session show --client_id --topic
+------------+------------------+
| client_id  | topic            |
+------------+------------------+
| mqttsender | spB1.0/testing/# |
+------------+------------------+

And of course, my client gets plenty messages, so we know VerneMQ is working

[11:20:43 INF] Message recieved: {"Timestamp":1685038843643,"Seq":228,"Model":"dtmi:com:example:TemperatureController;2","WorkingSet":342112}
[11:20:43 INF] Message recieved: {"Timestamp":1685038843643,"Seq":228,"Model":"dtmi:com:example:Thermostat;1","Temperature":30.43704798532389}
[11:20:43 INF] Message recieved: {"Timestamp":1685038843643,"Seq":228,"Model":"dtmi:com:example:Thermostat;1","Temperature":30.109562092634533}
[11:20:44 INF] Message recieved: {"Timestamp":1685038844144,"Seq":229,"Model":"dtmi:com:example:TemperatureController;2","WorkingSet":342464}
[11:20:44 INF] Message recieved: {"Timestamp":1685038844144,"Seq":229,"Model":"dtmi:com:example:Thermostat;1","Temperature":36.617387872822825}
[11:20:44 INF] Message recieved: {"Timestamp":1685038844144,"Seq":229,"Model":"dtmi:com:example:Thermostat;1","Temperature":41.865267138484}

Actual behavior

Telegraf does connect

$ vmq-admin session show
+------------+-----------+------------+---------------+-----------+-----------+
| client_id  | is_online | mountpoint | peer_host     | peer_port | user      |
+------------+-----------+------------+---------------+-----------+-----------+
| mqttsender | true      |            | 192.168.176.1 | 34796     | undefined |
+------------+-----------+------------+---------------+-----------+-----------+
| telegraf   | true      |            | 192.168.176.4 | 37216     | undefined |
+------------+-----------+------------+---------------+-----------+-----------+

Telegraf does not subscribe to any topics

$ vmq-admin session show --client_id --topic
[No output shown]

We can see that the messages are retained, and of course, subscribing to them from another client works fine.

$ vmq-admin retain show
+---------------------------------------------------------------------------------------------------------------+----------------------------------------------+
| payload                                                                                                       | topic                                        |
+---------------------------------------------------------------------------------------------------------------+----------------------------------------------+
| {"Timestamp":1685032014787,"Seq":47,"Model":"dtmi:com:example:TemperatureController;2","WorkingSet":319840}   | spB1.0/testing/NDATA/device-2                |
+---------------------------------------------------------------------------------------------------------------+----------------------------------------------+
| {"Timestamp":1685032015295,"Seq":48,"Model":"dtmi:com:example:TemperatureController;2","WorkingSet":320032}   | spB1.0/testing/NDATA/device-1                |
+---------------------------------------------------------------------------------------------------------------+----------------------------------------------+
| {"Timestamp":1685032014787,"Seq":47,"Model":"dtmi:com:example:Thermostat;1","Temperature":61.75570504584947}  | spB1.0/testing/DDATA/device-2/thermostat2    |
+---------------------------------------------------------------------------------------------------------------+----------------------------------------------+
| {"Timestamp":1685032014787,"Seq":47,"Model":"dtmi:com:example:Thermostat;1","Temperature":56.180339887498945} | spB1.0/testing/DDATA/device-2/thermostat1    |
+---------------------------------------------------------------------------------------------------------------+----------------------------------------------+
| {"Timestamp":1685032015295,"Seq":48,"Model":"dtmi:com:example:Thermostat;1","Temperature":45.8417661836448}   | spB1.0/testing/DDATA/device-1/thermostat2    |
+---------------------------------------------------------------------------------------------------------------+----------------------------------------------+
| {"Timestamp":1685032015295,"Seq":48,"Model":"dtmi:com:example:Thermostat;1","Temperature":39.99999999999999}  | spB1.0/testing/DDATA/device-1/thermostat1    |

Also, we can look at the client trace. Here we can see that the telegraf client periodically pings the server, which dutifully responds.

$ vmq-admin trace client client-id=telegraf  

2023-05-25T17:38:07Z Starting trace for 1 existing sessions for client "telegraf" with PIDs
    [<8676.591.0>]
2023-05-25T17:38:30Z <8676.591.0> MQTT RECV: CID: "telegraf" PINGREQ()
2023-05-25T17:38:30Z <8676.591.0> MQTT SEND: CID: "telegraf" PINGRESP()

Meanwhile, we can look at the trace from the sender, and it's chugging away great.

$ vmq-admin trace client client-id=mqttsender

2023-05-25T17:37:53Z Starting trace for 1 existing sessions for client "mqttsender" with PIDs
    [<8676.655.0>]
2023-05-25T17:37:54Z <8676.655.0> MQTT RECV: CID: "mqttsender" PUBLISH(d0, q2, r1, m1864, "spB1.0/testing/NDATA/device-2") with payload:
    {"Timestamp":1685036274062,"Seq":621,"Model":"dtmi:com:example:TemperatureController;2","WorkingSet":342752}
2023-05-25T17:37:54Z <8676.655.0> Calling auth_on_publish(undefined,{[],
                                                                     <<"mqttsender">>},2,spB1.0/testing/NDATA/device-2,true) with payload:
    {"Timestamp":1685036274062,"Seq":621,"Model":"dtmi:com:example:TemperatureController;2","WorkingSet":342752}
2023-05-25T17:37:54Z <8676.655.0> Hook returned "ok"
2023-05-25T17:37:54Z <8676.655.0> MQTT SEND: CID: "mqttsender" PUBREC(m1864)
2023-05-25T17:37:54Z <8676.655.0> MQTT RECV: CID: "mqttsender" PUBREL(m1864)
2023-05-25T17:37:54Z <8676.655.0> MQTT SEND: CID: "mqttsender" PUBCOMP(m1864)
2023-05-25T17:37:54Z <8676.655.0> MQTT RECV: CID: "mqttsender" PUBLISH(d0, q2, r1, m1865, "spB1.0/testing/DDATA/device-2/thermostat1") with payload:
    {"Timestamp":1685036274062,"Seq":621,"Model":"dtmi:com:example:Thermostat;1","Temperature":56.180339887498945}
2023-05-25T17:37:54Z <8676.655.0> Calling auth_on_publish(undefined,{[],
                                                                     <<"mqttsender">>},2,spB1.0/testing/DDATA/device-2/thermostat1,true) with payload:
    {"Timestamp":1685036274062,"Seq":621,"Model":"dtmi:com:example:Thermostat;1","Temperature":56.180339887498945}
2023-05-25T17:37:54Z <8676.655.0> Hook returned "ok"
2023-05-25T17:37:54Z <8676.655.0> MQTT SEND: CID: "mqttsender" PUBREC(m1865)
2023-05-25T17:37:54Z <8676.655.0> MQTT RECV: CID: "mqttsender" PUBREL(m1865)
2023-05-25T17:37:54Z Trace rate limit triggered.
Done.

If we use Mosquitto instead, it works correctly. In this case, replace the mqtt section in the docker compose with the following. Nothing special in the mosquitto.conf. Just allowing anonymous connections.

  mqtt:
    image: eclipse-mosquitto:2.0.15
    container_name: mosquitto
    restart: always
    ports:
      - '1883:1883'
      - '9001:9001'
    volumes:
      - ./mosquitto.conf:/mosquitto/config/mosquitto.conf

In this case we can see the expected output in the telegraf docker logs:

telegraf   | mqtt_consumer,host=1f8f3992b748,topic=spB1.0/testing/NDATA/device-1 value="{\"Timestamp\":1685058951186,\"Seq\":6,\"Model\":\"dtmi:com:example:TemperatureController;2\",\"WorkingSet\":292000}" 1685058951191578383
telegraf   | mqtt_consumer,host=1f8f3992b748,topic=spB1.0/testing/DDATA/device-1/thermostat1 value="{\"Timestamp\":1685058951186,\"Seq\":6,\"Model\":\"dtmi:com:example:Thermostat;1\",\"Temperature\":33.81966011250105}" 1685058951192308246
telegraf   | mqtt_consumer,host=1f8f3992b748,topic=spB1.0/testing/DDATA/device-1/thermostat2 value="{\"Timestamp\":1685058951186,\"Seq\":6,\"Model\":\"dtmi:com:example:Thermostat;1\",\"Temperature\":38.24429495415053}" 1685058951192994161
telegraf   | mqtt_consumer,host=1f8f3992b748,topic=spB1.0/testing/NDATA/device-2 value="{\"Timestamp\":1685058951687,\"Seq\":7,\"Model\":\"dtmi:com:example:TemperatureController;2\",\"WorkingSet\":292832}" 1685058951692530844
telegraf   | mqtt_consumer,host=1f8f3992b748,topic=spB1.0/testing/DDATA/device-2/thermostat1 value="{\"Timestamp\":1685058951687,\"Seq\":7,\"Model\":\"dtmi:com:example:Thermostat;1\",\"Temperature\":49.99999999999999}" 1685058951693261994
telegraf   | mqtt_consumer,host=1f8f3992b748,topic=spB1.0/testing/DDATA/device-2/thermostat2 value="{\"Timestamp\":1685058951687,\"Seq\":7,\"Model\":\"dtmi:com:example:Thermostat;1\",\"Temperature\":56.18033988749

Additional info

This seems like the exact same behaviour as #9736, with ActiveMQ Artemis instead of VerneMQ.

Also note that I have gotten an old version of Telegraf to work with an old version of VerneMQ, when running directly on the metal (not docker), but this was a couple years ago, and I don't have those configs around.

powersj commented 1 year ago

Hi,

If this works with mosquitto, but not VerneMQ that points to the client (e.g. paho.mqtt.golang) or VerneMQ rejecting something. I have put up https://github.com/influxdata/telegraf/pull/13352 which has the client's debug logs enabled. In 20-30mins some artifacts will be attached that you can use. Can you download the artifact for your platform and try launching that and provide those logs please?

Thanks

jcoliz commented 1 year ago

@powersj Thanks for having a look! I am away for a long weekend. Will try this next week.

jcoliz commented 1 year ago

@powersj Thanks again. I think I have this sorted now. In the process of preparing a clean repro environment to apply your version, I discovered the problem.

VerneMQ takes a long time to come up. In that time, Telegraf tries and fails to start multiple times. Something in that interaction is causing a problem between components.

I've since added a service_healthy constraint to Telegraf's dependency, and things have been 100% since then. (Illustrated below)

services:
  mqtt:
    image: vernemq/vernemq:1.12.6.2
    healthcheck:
      test: curl --fail http://localhost:8888/health || exit 1
      interval: 20s
      retries: 5
      start_period: 10s
      timeout: 5s
  telegraf:
    image: telegraf:1.26.3-alpine
    depends_on:
      influxdb:
        condition: service_started
      mqtt:
        condition: service_healthy    
powersj commented 1 year ago

Awesome! Glad to hear you got it worked out and thank you for providing what you did to resolve it.

I'll close this and the PR then.