mqtt-tools / mqttwarn

A highly configurable MQTT message router, where the routing targets are notification plugins, primarily written in Python.
https://mqttwarn.readthedocs.io/
Eclipse Public License 2.0
955 stars 183 forks source link

Help with influxdb service..... #540

Closed psyciknz closed 3 years ago

psyciknz commented 3 years ago

I've been contributing to yet another service....somethign that takes all my docker containers and pushes them out via mqtt....specifically it has the memory used and cpu of each container. (https://github.com/psyciknz/docker2mqtt)

Now I'd like to get than in influx, and though about using the mqttwarn influxdb service.

Here's the example of a bunch of topics that I generate:

sensors/d2m/Melkor/logspout/name logspout
sensors/d2m/Melkor/logspout/image micahhausler/logspout:gelf
sensors/d2m/Melkor/logspout/status running
sensors/d2m/Melkor/logspout/state on
sensors/d2m/Melkor/logspout/memoryused 6.738
sensors/d2m/Melkor/logspout/memorylimit 10.0
sensors/d2m/Melkor/logspout/netinput 0.93359375
sensors/d2m/Melkor/logspout/netoutput 223.0
sensors/d2m/Melkor/logspout/blockinput 18.4
sensors/d2m/Melkor/logspout/blockoutput 0.0
sensors/d2m/Melkor/logspout/memory 6.738MiB / 10MiB
sensors/d2m/Melkor/logspout/cpu 0.84%
sensors/d2m/Melkor/logspout/netio 956kB / 223MB

Of note is that Melkor is a server....this will change depending on the docker host. logspout is the container. This is an example of a container on each hose.

Ideally I'd want the memory used and cpu to go into an influx db with the server,container the measurement and the value.

I can't quite rememember what I should have for fields and tags for this. Field is the cpu, and the tags are the server and container?

Anyway, not specifically an mqttwatrn issue, but might be a fun exercise......if I have to I'll write to influx directly from the new service.

I've got the following for the config:

; -----------------------------------------
;             Docker2mqtt
; -----------------------------------------
; Docker 2 mqtt
[docker 2 mqtt]
;sensors/d2m/Melkor/d2mqtt/cpu 2.15%
topic = sensors/d2m/Melkor/#
targets = log:info, influxdb:docker2mqtt
format = server=Melkor,container= value=

Where I'm specifically setting the server, but if I could pull from sensors/d2m/# it would be better.

jacques42 commented 3 years ago

Hey - you may want to check out the Examples section for to see how to use datamaps to extract information from the topic string into key/value pairs to be used in the format string for outbound. Maybe this already does what you are looking for. The value should be named, i.e. 'cpu=20'.

Here is an example from my config, which kind of exactly follows the example in the handbook and nicely works.

; path to file containing self-defined functions for formatmap and datamap
functions = '/etc/mqttwarn/myfunctions.py'

...

[octoPrint/+/event/PrintStarted]
targets =influxdb:influx-octoprint-event
datamap = OctoprintDataMap_Topic()
format = host={host},event="PrintStarted" filename="{name}"

In my case the second field of the topic string contains the server name and in the mqtt topic configuration the second field is matched by wildcard +

[octoPrint/+/event/PrintStarted]

So any origin server name will be matched. In every particular execution, the specific topic string has a specific server name included. So this specific name then is extracted by the datamap function into a key/value pair for later use in the format string. (I forget what I intended to do by extracting the partname key/value but it seems not doing anything right now, so just ignore it)

Datamap function located in file /etc/mqttwarn/myfunctions.py

def OctoprintDataMap_Topic(topic, srv):
        srv.logging.debug("*** OctoprintDataMap_Temp() function call")
        try:
                parts = topic.split('/')
                hostname = parts[1]
                partname = parts[3]
        except:
                hostname = 'unknown'
                partname = 'unknown'
        return dict(host=hostname, part=partname)
psyciknz commented 3 years ago

Awesome thanks. That's a really good demo. I'll process that later. Or I may just change the format of payload from the origin service to json, but I'll give you way a go first.

Wonder if you should package that up and get it added to the handbook as an example? @jpmens @amotl

psyciknz commented 3 years ago

So I can't see my datamap running. Topic: sensors/d2m/Melkor/paradoxpaiv2 Payload:

{"name": "paradoxpaiv2", "image": "paradoxalarminterface/pai:latest", "status": "running", "state": "on", "json": "{\"BlockIO\": \"5.19GB / 0B\", \"CPUPerc\": \"0.56%\", \"Container\": \"5d2c83686c6b\", \"ID\": \"5d2c83686c6b\", \"MemPerc\": \"68.60%\", \"MemUsage\": \"27.44MiB / 40MiB\", \"Name\": \"paradoxpaiv2\", \"NetIO\": \"1.9GB / 2.25GB\", \"PIDs\": \"15\"}", "memoryused": 27.44, "memorylimit": 40.0, "netinput": 1945.6, "netoutput": 2304.0, "blockinput": 5314.56, "blockoutput": 0.0, "memory": "27.44MiB / 40MiB", "cpu": "0.56%", "netio": "1.9GB / 2.25GB"}

Getting picked up by mqttwarn:

2021-06-24 09:20:06,197 DEBUG [core] Section [Docker2mqtt] matches message on sensors/d2m/drogo/podsync. Processing...
2021-06-24 09:20:06,204 DEBUG [influxdb] *** MODULE=/usr/local/lib/python3.9/site-packages/mqttwarn/services/influxdb.py: service=influxdb, target=docker2mqtt
2021-06-24 09:20:06,205 DEBUG [influxdb] http://influxdb.andc.nz:8096/write?db=docker&rp=autogen&precision=ns
2021-06-24 09:20:06,205 DEBUG [influxdb] docker,topic=sensors_d2m_drogo_podsync,container=podsync,server=unknown cpu=0.28,memory=28.32

And here is the mqttwarn.ini

[Docker2mqtt]
datamap = docker2mqtt_Topic()
targets = log:info, influxdb:docker2mqtt
format = container={name},server={host} cpu={cpu},memory={memoryused}

Which it's handling well....but server={host} (from datamap is staying empty.

Here's the datamap

def docker2mqtt_Topic(topic):
    if type(topic) == str:
        try:
            # sensors/d2m/drogo/photoprism_mariadb
            parts = topic.split('/')
            host = parts[1]
            #container = parts[4]
        except:
            host = 'unknown'
        return dict(host=host)
    return None

But I never see it do anything, can you log from here? I tried print and it didn't seem to do much either.

jpmens commented 3 years ago

@psyciknz look at the function signature @jacques42 used: there's a srv as second argument, and he uses that to log a debug.

jacques42 commented 3 years ago

I don't really see why the topic matches from your mqttwarn.ini config, so I am a bit puzzled. I think what it should look like see below (but please adjust to exactly match, in case needed).

I also notice you want to take on key / values from the payload, so we need to slightly change the approach to use the alldata function, please notice this change:

[sensors/d2m/+/+]
alldata= docker2mqtt_alldata()
targets = log:info, influxdb:docker2mqtt
format = container={name},server={host} cpu={cpu},memory={memoryused}

and the alldata function - you need to change the mqttwarn logging settings to DEBUG in the ini file, in order to see output from that in the logs:

def docker2mqtt_alldata(topic, data, srv):
    if type(topic) == str:
        try:
            # sensors/d2m/Melkor/paradoxpaiv2
            parts = topic.split('/')
            host = parts[2]
            srv.logging.debug("*** docker2mqt_alldata(): host %s", host)
        except:
            host = 'unknown'
            container = 'unknown'
            return dict(host=host, container=container)

   jsondata = json.loads(data["payload"])
   cpu = jsondata["cpu"]
   container = jsondata["name"]
   memoryused = jsondata["memoryused"]
   srv.logging.debug("*** docker2mqt_alldata(): cpu %s container name %s memoryused %s", cpu, container, memoryused)

   return dict(host=host, name=container, cpu=cpu, memoryused=memoryused)

Note that I adjusted the config right here in Github and did not try this out so apologize if there is syntax errors or anything.

psyciknz commented 3 years ago

I'll have to try that.

I managed to get it all going. I did find that there's a default topic tag passed. But it all got in to influx as I wanted.

Using a json payload was the easiest way of running this.

Datamaps in a function didn't seem to work so well, and had lots of trial and error. If a srv can be the 2nd parameter the we might need to update some documentation to show that. I think I tried it just as a guess and it didn't work for me. But I hadn't seen that signature you are pointing me at.

jpmens commented 3 years ago

If a srv can be the 2nd parameter the we might need to update some documentation to show that

I believe there are examples in our documentation.

Glad you got it all going with @jacques42's help. I'll now close.