influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.63k stars 5.58k forks source link

mqtt_consumer stops working (again) after receiving initial message (json_v2 used) #13922

Closed kratsg closed 1 year ago

kratsg commented 1 year ago

Relevant telegraf.conf

[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = "1s"
  debug = true
  quiet = false
  logfile = ""
  hostname = "itkpix-srv"

[[inputs.mqtt_consumer]]
  servers = ["tcp://mqtt:1883"]
  topics = [
    "scipp/cleanroom_test/monitoring/#",
  ]

  topic_tag = "topic"
  qos = 2
  persistent_session = true
  client_id = "telegraf"
  data_format = "json_v2"
  username = "telegraf"
  password = "telegraf"

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "+/+/+/+/+"
    tags = "institute/location/_/stand/_"
    measurement = "_/_/_/_/measurement"

  [[inputs.mqtt_consumer.json_v2]]
    measurement_name_path = "@this.name"
    timestamp_path = "timestamp"
    timestamp_format = "unix"
    [[inputs.mqtt_consumer.json_v2.field]]
      path = "value"
      type = "float"
    [[inputs.mqtt_consumer.json_v2.object]]
      path = "@this"
      disable_prepend_keys = true
      excluded_keys = ["name", "value"]
      [[inputs.mqtt_consumer.json_v2.object.tag]]
        path = "tags"

[[outputs.file]]
  files = ["stdout"]
  data_format = "influx"

Logs from Telegraf

2023-09-13T22:01:38Z I! Loading config: /etc/telegraf/telegraf.conf
2023-09-13T22:01:38Z I! Starting Telegraf 1.28.1 brought to you by InfluxData the makers of InfluxDB
2023-09-13T22:01:38Z I! Available plugins: 240 inputs, 9 aggregators, 29 processors, 24 parsers, 59 outputs, 5 secret-stores
2023-09-13T22:01:38Z I! Loaded inputs: mqtt_consumer
2023-09-13T22:01:38Z I! Loaded aggregators:
2023-09-13T22:01:38Z I! Loaded processors:
2023-09-13T22:01:38Z I! Loaded secretstores:
2023-09-13T22:01:38Z I! Loaded outputs: file
2023-09-13T22:01:38Z I! Tags enabled: host=itkpix-srv
2023-09-13T22:01:38Z I! [agent] Config: Interval:1s, Quiet:false, Hostname:"itkpix-srv", Flush Interval:10s
2023-09-13T22:01:38Z D! [agent] Initializing plugins
2023-09-13T22:01:38Z D! [agent] Connecting outputs
2023-09-13T22:01:38Z D! [agent] Attempting connection to [outputs.file]
2023-09-13T22:01:38Z D! [agent] Successfully connected to outputs.file
2023-09-13T22:01:38Z D! [agent] Starting service inputs
2023-09-13T22:01:38Z I! [inputs.mqtt_consumer] Connected [tcp://mqtt:1883]
2023-09-13T22:01:38Z D! [inputs.mqtt_consumer] Session found [tcp://mqtt:1883]
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,make=Digikey,model=HYT271,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=15.495400003055648 1694642499000000000
2023-09-13T22:01:48Z D! [outputs.file] Wrote batch of 1 metrics in 60.384µs
2023-09-13T22:01:48Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:01:58Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:02:08Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:02:18Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:02:28Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:02:38Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:02:48Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:02:58Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:03:08Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:03:18Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:03:28Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:03:38Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:03:48Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:03:58Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:04:08Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:04:18Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-09-13T22:04:28Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
(continues like this ad nauseaum)

System info

Telegraf 1.28.1, CentOS7, all dockerized

Docker

  telegraf:
    image: telegraf:1.28.1
    restart: unless-stopped
    container_name: "telegraf"
    volumes:
      - ./config/telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
      # For docker stats
      - /var/run/docker.sock:/var/run/docker.sock:ro
    depends_on:
      - mqtt
    labels:
      - "traefik.enable=false"
    user: telegraf:980  # stat -c '%g' /var/run/docker.sock
    networks:
      - internal

  mqtt:
    image: "eclipse-mosquitto"
    container_name: "mosquitto"
    volumes:
      - "./config/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf"
    labels:
      - "traefik.enable=true"
      # note: due to limitations with canary and TLS - disable TLS and rely on IP filtering
      - "traefik.tcp.routers.mqtt.rule=(HostSNI(`*`))"
      - "traefik.tcp.routers.mqtt.entrypoints=mqtt"
      - "traefik.tcp.services.mqtt.loadbalancer.server.port=1883"
      - "traefik.tcp.routers.mqtt.middlewares=allowed-ips@docker"
    networks:
      - internal

Steps to reproduce

  1. Run python script below (connects to mosquitto and sends a message every second)
  2. docker-compose stop telegraf && docker-compose up -d telegraf && docker logs -f telegraf
  3. Observe logs show only one value being included and nothing else

Expected behavior

Multiple entries recorded, rather than just one and done, or at least some sort of error that I've done something wrong...

Actual behavior

One and done. Looks a lot like #13476 but not errors at all.

Additional info

I use traefik to bring it all up, but I've pulled out the only snippets in the docker-compose that deal with mosquitto + telegraf.

import logging
import time
import paho.mqtt.client as mqtt

logging.basicConfig()
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

import random
import json

def on_connect(client, _userdata, _flags, rc):
    log.info("Connected with result code " + str(rc))

client = mqtt.Client()
client.on_connect = on_connect
client.connect("HOSTNAME", 8883, 60)
client.loop_start()

while True:
  msg = json.dumps({"name": "giordon", "value": 50.0*random.random(), "timestamp": int(time.time()), "tags": [{"make": "Digikey", "model": "HYT271"}]})

  client.publish("scipp/cleanroom_test/monitoring/beta/giordon", msg)
  time.sleep(1)

client.loop_stop()
kratsg commented 1 year ago

Additional notes: I checked that the messages make it into mosquitto via this script

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("#")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("HOSTNAME", 8883, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

which is showing me all the (multiple) messages coming in. Looking at the mosquitto logs doesn't show anything revealing either

1694642498: New connection from 172.22.0.14:59182 on port 1883.
1694642498: New client connected from 172.22.0.14:59182 as auto-83719E37-CEC9-D2DE-770A-E228912C43C6 (p2, c1, k60).
1694643006: New connection from 172.22.0.16:55156 on port 1883.
1694643006: New client connected from 172.22.0.16:55156 as telegraf (p2, c0, k60, u'telegraf').
powersj commented 1 year ago

Please add client_trace = false to your mqtt_consumer config and provide the logs with additional details.

kratsg commented 1 year ago

Please add client_trace = false to your mqtt_consumer config and provide the logs with additional details.

you mean = true right?

logs with client_trace = true on mqtt_consumer ``` 2023-09-13T22:28:37Z I! Loading config: /etc/telegraf/telegraf.conf 2023-09-13T22:28:37Z I! Starting Telegraf 1.28.1 brought to you by InfluxData the makers of InfluxDB 2023-09-13T22:28:37Z I! Available plugins: 240 inputs, 9 aggregators, 29 processors, 24 parsers, 59 outputs, 5 secret-stores 2023-09-13T22:28:37Z I! Loaded inputs: mqtt_consumer 2023-09-13T22:28:37Z I! Loaded aggregators: 2023-09-13T22:28:37Z I! Loaded processors: 2023-09-13T22:28:37Z I! Loaded secretstores: 2023-09-13T22:28:37Z I! Loaded outputs: file 2023-09-13T22:28:37Z I! Tags enabled: host=itkpix-srv 2023-09-13T22:28:37Z I! [agent] Config: Interval:1s, Quiet:false, Hostname:"itkpix-srv", Flush Interval:10s 2023-09-13T22:28:37Z D! [agent] Initializing plugins 2023-09-13T22:28:37Z D! [agent] Connecting outputs 2023-09-13T22:28:37Z D! [agent] Attempting connection to [outputs.file] 2023-09-13T22:28:37Z D! [agent] Successfully connected to outputs.file 2023-09-13T22:28:37Z D! [agent] Starting service inputs 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [client] Connect() 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [store] memorystore initialized 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [client] about to write new connect msg 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [client] socket connected to broker 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [client] Using MQTT 3.1.1 protocol 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] connect started 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] received connack 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [client] startCommsWorkers called 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [client] client is connected/reconnected 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] incoming started 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] startIncomingComms started 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [pinger] keepalive starting 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] outgoing started 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] startComms started 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [client] startCommsWorkers done 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [store] enter Resume 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [store] exit resume 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [client] exit startClient 2023-09-13T22:28:37Z I! [inputs.mqtt_consumer] Connected [tcp://mqtt:1883] 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] Session found [tcp://mqtt:1883] 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] startIncomingComms: inboundFromStore complete 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:37Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:38Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:38Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:38Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:38Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:39Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:39Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:39Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:39Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:40Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:40Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:40Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:40Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:41Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:41Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:41Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:41Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:42Z D! [inputs.mqtt_consumer] [pinger] ping check5.000416747 2023-09-13T22:28:42Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:42Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:42Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:42Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:43Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:43Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:43Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:43Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:44Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:44Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:44Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:44Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:45Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:45Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:45Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:45Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:46Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:46Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:46Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:46Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,make=Digikey,model=HYT271,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=27.556064315899935 1694644117000000000 2023-09-13T22:28:47Z D! [inputs.mqtt_consumer] [pinger] ping check10.000507401 2023-09-13T22:28:47Z D! [outputs.file] Wrote batch of 1 metrics in 60.408µs 2023-09-13T22:28:47Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics 2023-09-13T22:28:47Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:47Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:47Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:47Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:48Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:48Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:48Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:48Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:49Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:49Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:49Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:49Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:50Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:50Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:51Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:51Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:51Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:51Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:52Z D! [inputs.mqtt_consumer] [pinger] ping check15.000895715 2023-09-13T22:28:52Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:52Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:52Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:52Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:53Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:53Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:53Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:53Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:54Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:54Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:54Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:54Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:55Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:55Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:55Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:55Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:56Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:56Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:56Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:56Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:57Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics 2023-09-13T22:28:57Z D! [inputs.mqtt_consumer] [pinger] ping check20.000908274 2023-09-13T22:28:57Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:57Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:57Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:57Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:58Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:58Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:58Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:58Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:28:59Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:28:59Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:28:59Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:28:59Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:00Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:00Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:00Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:00Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:01Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:01Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:01Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:01Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:02Z D! [inputs.mqtt_consumer] [pinger] ping check25.000623936 2023-09-13T22:29:02Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:02Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:02Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:02Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:03Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:03Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:03Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:03Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:04Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:04Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:04Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:04Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:05Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:05Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:05Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:05Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:06Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:06Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:06Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:06Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:07Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics 2023-09-13T22:29:07Z D! [inputs.mqtt_consumer] [pinger] ping check30.001064074 2023-09-13T22:29:07Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:07Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:07Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:07Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:08Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:08Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:08Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:08Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:09Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:09Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:09Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:09Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:10Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:10Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:10Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:10Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:11Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:11Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:11Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:11Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:12Z D! [inputs.mqtt_consumer] [pinger] ping check35.000785237 2023-09-13T22:29:12Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:12Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:12Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:12Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:13Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:13Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:13Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:13Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:14Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:14Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:14Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:14Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:15Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:15Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:15Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:15Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:16Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:29:16Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:29:16Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0 2023-09-13T22:29:16Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:29:17Z D! [inputs.mqtt_consumer] [pinger] ping check40.001101971 2023-09-13T22:29:17Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics ```
powersj commented 1 year ago

you mean = true right?

heh yes

startIncoming Received Message

Does appear that you are getting many messages so communication is good and the client is working. What this usually means is that your parsing of messages is failing given your json_v2 configuration.

kratsg commented 1 year ago

startIncoming Received Message

Does appear that you are getting many messages so communication is good and the client is working. What this usually means is that your parsing of messages is failing given your json_v2 configuration.

Is there a reason this silently fails? Also, interestingly, thought it might be related to messageID somehow. So I updated the "push" script to send with qos=1 and the output is a bit different!

I see lots of 2023-09-13T22:39:08Z D! [inputs.mqtt_consumer] [store] memorystore del: message69was deleted. Is there a way to force more verbosity with the json_v2 parser in any way?

logs with messages sent using qos=1 (but qos=2 remains in telegraf config) ``` 2023-09-13T22:39:07Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics 2023-09-13T22:39:07Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:39:07Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:39:07Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:68 2023-09-13T22:39:07Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:39:07Z D! [inputs.mqtt_consumer] [net] putting puback msg on obound 2023-09-13T22:39:07Z D! [inputs.mqtt_consumer] [store] memorystore del: message68was deleted 2023-09-13T22:39:07Z D! [inputs.mqtt_consumer] [net] done putting puback msg on obound 2023-09-13T22:39:07Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.PubackPacket 2023-09-13T22:39:07Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:39:08Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:39:08Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:39:08Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:69 2023-09-13T22:39:08Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:39:08Z D! [inputs.mqtt_consumer] [net] putting puback msg on obound 2023-09-13T22:39:08Z D! [inputs.mqtt_consumer] [store] memorystore del: message69was deleted 2023-09-13T22:39:08Z D! [inputs.mqtt_consumer] [net] done putting puback msg on obound 2023-09-13T22:39:08Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.PubackPacket 2023-09-13T22:39:08Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:39:09Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:39:09Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:39:09Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:70 2023-09-13T22:39:09Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:39:09Z D! [inputs.mqtt_consumer] [net] putting puback msg on obound 2023-09-13T22:39:09Z D! [inputs.mqtt_consumer] [store] memorystore del: message70was deleted 2023-09-13T22:39:09Z D! [inputs.mqtt_consumer] [net] done putting puback msg on obound 2023-09-13T22:39:09Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.PubackPacket 2023-09-13T22:39:09Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:39:10Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:39:10Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:39:10Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:71 2023-09-13T22:39:10Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:39:10Z D! [inputs.mqtt_consumer] [net] putting puback msg on obound 2023-09-13T22:39:10Z D! [inputs.mqtt_consumer] [store] memorystore del: message71was deleted 2023-09-13T22:39:10Z D! [inputs.mqtt_consumer] [net] done putting puback msg on obound 2023-09-13T22:39:10Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.PubackPacket 2023-09-13T22:39:10Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:39:11Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:39:11Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:39:11Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:72 2023-09-13T22:39:11Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:39:11Z D! [inputs.mqtt_consumer] [net] putting puback msg on obound 2023-09-13T22:39:11Z D! [inputs.mqtt_consumer] [store] memorystore del: message72was deleted 2023-09-13T22:39:11Z D! [inputs.mqtt_consumer] [net] done putting puback msg on obound 2023-09-13T22:39:11Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.PubackPacket 2023-09-13T22:39:11Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:39:12Z D! [inputs.mqtt_consumer] [pinger] ping check0.541366859 2023-09-13T22:39:12Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:39:12Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:39:12Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:73 2023-09-13T22:39:12Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:39:12Z D! [inputs.mqtt_consumer] [net] putting puback msg on obound 2023-09-13T22:39:12Z D! [inputs.mqtt_consumer] [store] memorystore del: message73was deleted 2023-09-13T22:39:12Z D! [inputs.mqtt_consumer] [net] done putting puback msg on obound 2023-09-13T22:39:12Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.PubackPacket 2023-09-13T22:39:12Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:39:13Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:39:13Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:39:13Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:74 2023-09-13T22:39:13Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:39:13Z D! [inputs.mqtt_consumer] [net] putting puback msg on obound 2023-09-13T22:39:13Z D! [inputs.mqtt_consumer] [store] memorystore del: message74was deleted 2023-09-13T22:39:13Z D! [inputs.mqtt_consumer] [net] done putting puback msg on obound 2023-09-13T22:39:13Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.PubackPacket 2023-09-13T22:39:13Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:39:14Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:39:14Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:39:14Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:75 2023-09-13T22:39:14Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:39:14Z D! [inputs.mqtt_consumer] [net] putting puback msg on obound 2023-09-13T22:39:14Z D! [inputs.mqtt_consumer] [store] memorystore del: message75was deleted 2023-09-13T22:39:14Z D! [inputs.mqtt_consumer] [net] done putting puback msg on obound 2023-09-13T22:39:14Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.PubackPacket 2023-09-13T22:39:14Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:39:15Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:39:15Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:39:15Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:76 2023-09-13T22:39:15Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:39:15Z D! [inputs.mqtt_consumer] [net] putting puback msg on obound 2023-09-13T22:39:15Z D! [inputs.mqtt_consumer] [store] memorystore del: message76was deleted 2023-09-13T22:39:15Z D! [inputs.mqtt_consumer] [net] done putting puback msg on obound 2023-09-13T22:39:15Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.PubackPacket 2023-09-13T22:39:15Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:39:16Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message 2023-09-13T22:39:16Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound 2023-09-13T22:39:16Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:77 2023-09-13T22:39:16Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound 2023-09-13T22:39:16Z D! [inputs.mqtt_consumer] [net] putting puback msg on obound 2023-09-13T22:39:16Z D! [inputs.mqtt_consumer] [store] memorystore del: message77was deleted 2023-09-13T22:39:16Z D! [inputs.mqtt_consumer] [net] done putting puback msg on obound 2023-09-13T22:39:16Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.PubackPacket 2023-09-13T22:39:16Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message 2023-09-13T22:39:17Z D! [inputs.mqtt_consumer] [pinger] ping check0.53545075 ```
kratsg commented 1 year ago

Sorry, one more comment but I fail to see how the JSON parsing could fail after it succeeds at least once. I still see this show up once and never again

giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,make=Digikey,model=HYT271,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=15.495400003055648 1694642499000000000

so the JSON parsing has to be correct... unless it somehow fails the next time it runs?

kratsg commented 1 year ago

Getting further, since you pointed out it might be something json_v2 related, I went ahead and just commented out the portion of it for adding tags to the message, so it looks like

  [[inputs.mqtt_consumer.json_v2]]
    measurement_name_path = "name"
    timestamp_path = "timestamp"
    timestamp_format = "unix"
    [[inputs.mqtt_consumer.json_v2.field]]
      path = "value"
      type = "float"
    #[[inputs.mqtt_consumer.json_v2.object]]
    #  path = "@this"
    #  disable_prepend_keys = true
    #  excluded_keys = ["name", "value"]
    #  [[inputs.mqtt_consumer.json_v2.object.tag]]
    #    path = "tags"

as a reminder those messages being sent look like

scipp/cleanroom_test/monitoring/beta/giordon b'{"name": "giordon", "value": 48.297650533906676, "timestamp": 1694645994, "tags": [{"make": "Digikey", "model": "HYT271"}]}'

and now I start getting all the messages coming in -- this part worries me, because you are likely right that the parsing is failing (somehow?) but it's silently failing and I have no idea why.

2023-09-13T22:57:07Z D! [inputs.mqtt_consumer] [pinger]  ping check5.00070301
2023-09-13T22:57:08Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2023-09-13T22:57:08Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2023-09-13T22:57:08Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:1148
2023-09-13T22:57:08Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2023-09-13T22:57:09Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2023-09-13T22:57:09Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2023-09-13T22:57:09Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:1149
2023-09-13T22:57:09Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2023-09-13T22:57:10Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2023-09-13T22:57:10Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2023-09-13T22:57:10Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:1150
2023-09-13T22:57:10Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2023-09-13T22:57:11Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2023-09-13T22:57:11Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2023-09-13T22:57:11Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:1151
2023-09-13T22:57:11Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2023-09-13T22:57:12Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2023-09-13T22:57:12Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2023-09-13T22:57:12Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:1152
2023-09-13T22:57:12Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2023-09-13T22:57:12Z D! [inputs.mqtt_consumer] [pinger]  ping check10.000075722
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=40.53155196893359 1694645728000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=28.087525602364703 1694645733000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=47.4573301890539 1694645817000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=4.173054154462591 1694645818000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=20.061537884708734 1694645819000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=31.762575953516524 1694645820000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=12.090575443588424 1694645821000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=7.713461004272975 1694645822000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=35.168722778332004 1694645823000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=39.06668444683255 1694645824000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=31.501367193459274 1694645825000000000
giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=13.815335576582498 1694645826000000000
2023-09-13T22:57:12Z D! [outputs.file] Wrote batch of 18 metrics in 80.555µs
kratsg commented 1 year ago

I think the problem is with the json_v2.object portion (that's somehow breaking quietly). This variation does not work

...
    [[inputs.mqtt_consumer.json_v2.object]]
      path = "@this.tags"
      [[inputs.mqtt_consumer.json_v2.object.tag]]
        path = "*"

(only shows up one time). This also does not work

...
    [[inputs.mqtt_consumer.json_v2.object]]
      path = "@this"
      [[inputs.mqtt_consumer.json_v2.object.tag]]
        path = "tags.*"

This variation gets close, but the tags are considered fields, and not tags:

...
    [[inputs.mqtt_consumer.json_v2.object]]
      path = "@this.tags"
      disable_prepend_keys = false

so then I added

    [[inputs.mqtt_consumer.json_v2.object]]
      path = "@this.tags"
      disable_prepend_keys = false
      [[inputs.mqtt_consume.json_v2.object.tag]]
        path = "*"

and get the singular output

giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,make=Digikey,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=47.8408014170121 1694647814000000000

but now I realized there must be a bug! Because my message has two tags: make and model but the second one disappears -- which indicates something is going on with the object.tag parsing. Avoiding object.tag and doing something like this for now

    [[inputs.mqtt_consumer.json_v2.object]]
      path = "@this.tags"
      tags = ["make", "model"]
      disable_prepend_keys = false

gives me

giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,make=Digikey,model=HYT271,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon value=5.899164092993503 1694647961000000000

which works (I get all my tags) but is unfortunately inflexible in that it won't allow for GJSON in tags to expand on the tags there. Yet, when I do this variation

    [[inputs.mqtt_consumer.json_v2.object]]
      path = "@this"
      excluded_keys = ["name", "value", "timestamp"]
      tags = ["tags"]
      disable_prepend_keys = false

I see that the tags I specified as tags are added as fields and not tags (which is #10576) and doesn't match the corresponding example provided.

giordon,host=itkpix-srv,institute=scipp,location=cleanroom_test,stand=beta,topic=scipp/cleanroom_test/monitoring/beta/giordon tags_make="Digikey",tags_model="HYT271",value=47.202264937389906 1694648067000000000

So I think what's happening now is there's something deeply wrong with the tag portion of the functionality provided by json_v2 in that the expected behavior just does not work. I will hard-code my tags for now, but this miiiiiight be related enough to #10576 except I'm just not getting any errors or indiciation that parsing went wrong.


This is my resulting final configuration for now

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "+/+/+/+/+"
    tags = "institute/location/_/stand/_"
    measurement = "_/_/_/_/measurement"

  [[inputs.mqtt_consumer.json_v2]]
    timestamp_path = "timestamp"
    timestamp_format = "unix"
    [[inputs.mqtt_consumer.json_v2.field]]
      path = "value"
      type = "float"
    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "tags.make"
    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "tags.model"
powersj commented 1 year ago

The json_v2 parser is not super flexible. Hard-coding each and every tag is ultimately what I suggest users do if they want to use json_v2. If you need more dynamic options, then using the xpath parser with json is the way to go.

For example, using your JSON:

{
    "name": "giordon",
    "value": 48.297650533906676,
    "timestamp": 1694645994,
    "tags": [
      {
        "make": "Digikey",
        "model": "HYT271"
      }
    ]
}

with this config:

[[inputs.file]]
  files = ['data']
  data_format = "xpath_json"
  xpath_native_types = true

  [[inputs.file.xpath]]
    timestamp = "/timestamp"
    timestamp_format = "unix"
    field_selection = "/value"
    tag_selection = "/tags/*/*"

produces the metric, which is what I think you are after:

file,make=Digikey,model=HYT271 value=48.297650533906676 1694645994000000000
kratsg commented 1 year ago

The json_v2 parser is not super flexible. Hard-coding each and every tag is ultimately what I suggest users do if they want to use json_v2. If you need more dynamic options, then using the xpath parser with json is the way to go.

For example, using your JSON:

{
    "name": "giordon",
    "value": 48.297650533906676,
    "timestamp": 1694645994,
    "tags": [
      {
        "make": "Digikey",
        "model": "HYT271"
      }
    ]
}

with this config:

[[inputs.file]]
  files = ['data']
  data_format = "xpath_json"
  xpath_native_types = true

  [[inputs.file.xpath]]
    timestamp = "/timestamp"
    timestamp_format = "unix"
    field_selection = "/value"
    tag_selection = "/tags/*/*"

produces the metric, which is what I think you are after:

file,make=Digikey,model=HYT271 value=48.297650533906676 1694645994000000000

This actually gives me

file,make=Digikey,model=HYT271 value="48.297650533906676" 1694645994000000000

rather than the float for the value of the field. So I'm still trying to figure out how to do this (documentation is unfortunately lacking as I don't use xpath as much as say, jq/GJSON) but I tried number(/value) which doesn't work and /value | number which also doesn't work. This is the only variation I see working

# see https://github.com/influxdata/telegraf/issues/10576#issuecomment-1716370311
[[inputs.mqtt_consumer]]
  servers = ["tcp://mqtt:1883"]
  topics = [
    "monitoring/#"
  ]

  ## The message topic will be stored in a tag specified by this value.  If set
  ## to the empty string no topic tag will be created.
  topic_tag = "topic"
  qos = 0
  persistent_session = true
  client_id = "telegraf"
  client_trace = true
  data_format = "xpath_json"
  username = "telegraf"
  password = "telegraf"

  [[inputs.mqtt_consumer.topic_parsing]]
    # e.g. monitoring/scipp/pixel_modules/cleanroom/beta/chuck/temperature
    # or   monitoring/scipp/pixel_modules/cleanroom/alpha/interlock/defcon
    topic = "+/+/+/+/+/+/+"
    tags = "_/institute/project/room/stand/target/_"
    measurement = "_/_/_/_/_/_/measurement"

  [[inputs.mqtt_consumer.xpath]]
    timestamp = "/timestamp"
    timestamp_format = "unix"
    tag_selection = "/tags/*"
    [inputs.mqtt_consumer.xpath.fields]
      value = "number(/value)"

which gives me

giordon,host=itkpix-srv,institute=scipp,make=Digikey,model=HYT271,project=pixel_modules,room=cleanroom,stand=beta,target=chuck,topic=monitoring/scipp/pixel_modules/cleanroom/beta/chuck/giordon value=40.20702488194314 1694707359000000000

as expected (measurement is giordon from the topic).

kratsg commented 1 year ago

This is resolved with the switch from json_v2 to xpath_json. Particularly, it helps to:

Note or understand that the documentation for some of the parsers is not perfect or complete (e.g. in the case of json_v2 where it is understood that it's not great for dynamic tags/fields compared to xpath_json).