mqtt-tools / mqttwarn

A highly configurable MQTT message router, where the routing targets are notification plugins, primarily written in Python.
https://mqttwarn.readthedocs.io/
Eclipse Public License 2.0
957 stars 184 forks source link

Ingesting JSON payloads from air quality monitoring stations into Carbon #354

Closed lenny1972 closed 4 years ago

lenny1972 commented 5 years ago

Hi to Jan-Piet and to all the people that like me are excited by his great work on the MQTTWARN project. I'm using MQTTWARN in order to send to a Carbon back-end MQTT topics. I successfully send one topic to Carbon (Graphite/Whisper DB) but I failed to send a JSON object to Carbon. I'm a newby so probably there is something - stupid - that I didn't correctly configure/set in order to do it.

In the mqttwarn.log file I see clearly that the JSON object is taken in charge by the carbon service but nothing is then passed to Graphite/Whisper. Below a part of the log file hoping someone can help me in understand what I miss..

2019-01-06 18:40:06,353 DEBUG [mqttwarn] Message received on APA12/valore_tutti_sensori: {"Temperatura":11.96,"Umidita":48.22,"Pressione":1012.17,"PM1":27,"PM25":37,"PM10":40,"O3":21323.83,"NOX":29.96,"SO2":80.41,"CO2":981.73,"CO":1.23}
2019-01-06 18:40:06,353 DEBUG [mqttwarn] Section [+/valore_tutti_sensori] matches message on APA12/valore_tutti_sensori. Processing...
2019-01-06 18:40:06,353 DEBUG [mqttwarn] Message on APA12/valore_tutti_sensori going to carbon:c1
2019-01-06 18:40:06,354 DEBUG [mqttwarn] New `carbon:c1' job: APA12/valore_tutti_sensori
2019-01-06 18:40:06,354 DEBUG [mqttwarn] Processor #0 is handling: `carbon' for c1
2019-01-06 18:40:06,354 DEBUG [mqttwarn] Message on APA12/valore_tutti_sensori going to log:debug
2019-01-06 18:40:06,355 DEBUG [carbon] *** MODULE=services/carbon.pyc: service=carbon, target=c1
2019-01-06 18:40:06,355 DEBUG [carbon] Sending to carbon: APA12.valore_tutti_sensori {"Temperatura":11.96,"Umidita":48.22,"Pressione":1012.17,"PM1":27,"PM25":37,"PM10":40,"O3":21323.83,"NOX":29.96,"SO2":80.41,"CO2":981.73,"CO":1.23} 1546796406

Unfortunately I'm not skilled in Python and I cannot understand/read the carbon.py service file in order to know how to solve this problem.

For letting you understand the context, the project I want MQTTWARN to be integrated into deals with a set of pollution/environmental data monitoring stations (called APAx - in this case APA12 is the specific station) that collect air values (i.e. Temperature, Humidity, Pressure, CO2, etc..) and send them back to a central server that elaborates them a dashboard them for analysis. I want to simplify the messaging process and send just one data object contatining all the sensors' values....

Can someone helop me in addressing this problem?

Thanks a lot in advanced, Leonardo

jpmens commented 5 years ago

I don't think this is possible as is; looking at the carbon plugin, it takes a single metric (not a JSON payload) and sends that to a carbon server.

Please look at the example in our documentation.

You'll probably have to retrieve the MQTT payload (JSON), split that up into individual values and republish to MQTT to have mqttwarn process them, but this is not something mqttwarn is capable of out of the box.

amotl commented 5 years ago

Dear @lenny1972,

when reading about pollution/environmental data monitoring stations, especially in the context of collect[ing] air values, you are talking about a topic that resonates with us. @wetterfrosch might also be interested.

We are doing similar things within our Hiveeyes project [1,2], where we are also processing environmental and air quality sensor data coming in from custom sensors as well as from the luftdaten.info project [3,4]. Also, we are using Grafana [5] extensively for data visualization, you might enjoy having a look at some selected dashboards on [6,7].

If you want to simplify [your] messaging process and send just one data object containing all the sensors' values and this is something mqttwarn is not capable of, you might want to have a look at Kotori [8,9], which takes on the task of providing a flexible ingress data path for ingesting M2M/IoT/whatever telemetry data before conveniently diverting it to time series databases like InfluxDB [10]. Naturally, MQTT also plays an important role in its internal machinery. While Kotori might not yet exactly implement your requirements, which would probably well be possible, we might have a firsthand look what this could for you already.

We will be happy to have you on [2] for further discussions around environmental data monitoring in general while I am also open for specific questions on [9] when appropriate and if you will feel this might help you along. Just last year, we unlocked data acquisition from weeWX weather stations [11] - of course using MQTT again - into the DAQ system we are cautiously building over there. weeWX got its data from a Davis Vantage Pro2 in this case, but as far as I know, weeWX offers adapters to a plethora of weather stations from different vendors.

With kind regards, Andreas.

[1] https://hiveeyes.org/ [2] https://community.hiveeyes.org/ [3] https://luftdaten.info/ [4] https://github.com/hiveeyes/luftdatenpumpe [5] https://grafana.com/ [6] https://weather.hiveeyes.org/ [7] https://swarm.hiveeyes.org/ [8] https://getkotori.org/ [9] https://github.com/daq-tools/kotori [10] https://github.com/influxdata/influxdb [11] http://www.weewx.com/ [12] https://getkotori.org/docs/applications/weewx.html

rgitzel commented 5 years ago

@lenny1972

What you're asking for is perfectly understandable, but there's a fundamental problem: when do you send the combined message?

Say you have just two sensors, sending you messages once per minute. Do you... receive from one, then wait for the second one? what if they aren't in sync? What if one of the messages is lost (or not sent)? If one of the sensors goes down, do you still send the combined message with only one value, or do you wait indefinitely?

As @amotl suggests, a way around this is to simply collect the messages as received into a database (like InfluxDb), so that you have a record of every individual reading.

mqttwarn can do that already.

Then a separate process can query the database for all readings grouped by minute, and combine each group of readings (whether it's a complete set or not) into a single message.

If you are using InfluxDb, then Kapacitor can do this using a "TICK script", but a SQL query in a cron job could do just as well.

amotl commented 4 years ago

Dear @lenny1972,

For letting you understand the context, the project I want MQTTWARN to be integrated into deals with a set of pollution/environmental data monitoring stations (called APAx - in this case APA12 is the specific station) that collect air values (i.e. Temperature, Humidity, Pressure, CO2, etc..) and send them back to a central server that elaborates them a dashboard them for analysis.

Since answering your original request, luftdatenpumpe [1] received significant improvements and will now be able to handle multiple data sources. So, you might want to consider revisiting this in order to check if that would fit your bill in any way.

[2] and [3] are two examples of visualization dashboards which might come out of this. You might also enjoy visiting [4], [5] and [6].

With kind regards, Andreas.

[1] https://github.com/panodata/luftdatenpumpe [2] https://vmm.panodata.net/grafana/d/_JJ22OZZk/luftdaten-viewer-ldi-map [3] https://vmm.panodata.net/grafana/d/99jhzbTWz/wtf-all-compare-stations-overview-sds011-vs-irceline [4] https://community.panodata.org/t/hello-from-a-group-of-italian-citizens-interested-in-environment-and-environmental-measurements/29 [5] https://community.panodata.org/t/missing-data-of-my-luftdaten-info-sensor-with-ldi-station-id-19604-from-veglie-italy-on-the-grafana-map/26 [6] https://community.panodata.org/t/missing-data-of-my-luftdaten-info-sensor-with-ldi-station-id-19575-from-veglie-italy-on-the-grafana-map/49

jpmens commented 4 years ago

Closing as answered.