mqtt-tools / mqttwarn

A highly configurable MQTT message router, where the routing targets are notification plugins, primarily written in Python.
https://mqttwarn.readthedocs.io/
Eclipse Public License 2.0
958 stars 184 forks source link

Receiving and processing MQTT messages from Frigate NVR #632

Open sevmonster opened 1 year ago

sevmonster commented 1 year ago

I set up a Frigate -> Mosquitto -> mqttwarn -> Apprise -> Ntfy -> Nginx flow (whew) and everything is working... Almost. The filter seems to not be blocking bad messages even though it should be based on my debugging. And, I can't figure out how to send attachments. My only lead is to save the attachment to a local file before using the attach query parameter in my Ntfy URI, but it would be nice to be able to send it through the body of the message instead, which I believe Apprise supports.

Depending on if any of this not working is a bug or potential feature request, this issue could turn into an actual issue and not a support request :)

mqttwarn.ini

[defaults]
functions = funcs.py
launch    = apprise-ntfy

status_publish = True

[config:apprise-ntfy]
module   = apprise_single
baseuri  = ntfys://ntfy/frigate

[frigate/events]
filter  = frigate_events_filter()
alldata = frigate_events()
targets = apprise-ntfy
title   = {title}
format  = {format}
click   = {click}

funcs.py

# -*- coding: utf-8 -*-
from datetime import datetime

try:
    import json
except ImportError:
    import simplejson as json

def frigate_events(topic, data, srv=None):
    if srv is not None:
        srv.logging.debug('events ******************************************')
    a = json.loads(data['payload'])['after']
    f = lambda x: (y.replace('_', ' ') for y in x)
    r = {
      'camera': a['camera'],
      'label':  a['sub_label'] or a['label'],
      'current_zones': ', '.join(f(a['current_zones'])),
      'entered_zones': ', '.join(f(a['entered_zones'])),
      'time':   datetime.fromtimestamp(a['frame_time'])
    }
    r.update({
      'title':  f"{r['label']} entered {r['entered_zones']}",
      'format': f"In zones {r['current_zones']} at {r['time']}",
      'click':  f"https://frigate/events?camera={r['camera']}&label={r['label']}&zone={a['entered_zones'][0]}"
    })
    return r

def frigate_events_filter(topic, message, section, srv=None):
    if srv is not None:
        srv.logging.debug('filter ******************************************')
    try:
        message = json.loads(message)
    finally:
        message = None
    if message and message.get('type', None) != 'end' and 'after' in message:
        a = message['after']
        return False not in (x in a and (x == 'current_zones' or a[x]) for x in
                                  ('false_positive', 'camera', 'label',
                                   'current_zones', 'entered_zones',
                                   'frame_time'))
    return False

Sample payload

{
  "before": {
    "id": "1680791459.255384-abcdef",
    "camera": "camera",
    "frame_time": 1680791459.255384,
    "snapshot_time": 0,
    "label": "car",
    "sub_label": null,
    "top_score": 0,
    "false_positive": true,
    "start_time": 1680791459.255384,
    "end_time": null,
    "score": 0.7,
    "box": [
      0,
      20,
      0,
      20
    ],
    "area": 400,
    "ratio": 1,
    "region": [
      0,
      0,
      320,
      320
    ],
    "stationary": false,
    "motionless_count": 0,
    "position_changes": 0,
    "current_zones": [],
    "entered_zones": [],
    "has_clip": false,
    "has_snapshot": false
  },
  "after": {
    "id": "1680791459.255384-abcdef",
    "camera": "camera",
    "frame_time": 1680791506.638857,
    "snapshot_time": 1680791506.638857,
    "label": "car",
    "sub_label": null,
    "top_score": 0.75,
    "false_positive": false,
    "start_time": 1680791459.255384,
    "end_time": null,
    "score": 0.8,
    "box": [
      1,
      21,
      1,
      21
    ],
    "area": 400,
    "ratio": 1,
    "region": [
      0,
      0,
      320,
      320
    ],
    "stationary": false,
    "motionless_count": 1,
    "position_changes": 2,
    "current_zones": [],
    "entered_zones": [
      "zone1"
    ],
    "has_clip": true,
    "has_snapshot": true
  },
  "type": "new"
}
amotl commented 1 year ago

Dear sev,

thanks for writing in. I am not 100% sure, but I think the value of the message item within a JSON payload submitted via MQTT will get used as the body payload when forwarding messages to the corresponding service plugins.

You may want to play around with this command line incantation to see if "title" and "message" will get propagated properly.

mqttwarn --plugin=apprise \
    --config='{"baseuri": "ntfy://user:password@ntfy.example.org/topic1/topic2"}' \
    --options='{"addrs": [], "title": "Example notification", "message": "Hello world"}'

I do understand you correctly that putting something into the message body would fit your needs, right? Otherwise, please let me know if you are intentionally looking into putting something into the message's attachment instead. This may require a different approach, but is well worth to explore.

With kind regards, Andreas.

sevmonster commented 1 year ago

I do understand you correctly that putting something into the message body would fit your needs, right? Otherwise, please let me know if you are intentionally looking into putting something into the message's attachment instead. This may require a different approach, but is well worth to explore.

Sorry, I was speaking in reference to the Ntfy API. You can upload attachments via PUT request body. With Apprise, I think the only way to send an attachment is via URI, so you can disregard that. I really thought that there was another way to do it with Apprise, but I may have been mistaken.

For the remaining issue, filtering does not appear to work, so the unmodified JSON payload is displayed for non-matching messages, unless I am just using it wrong.

amotl commented 1 year ago

Hi again,

it should work to submit attachments via mqttwarn => Apprise => Ntfy. However, we have not tested it, so it will be sweet to explore this detail together.

The "Parameter Breakdown" section of the corresponding documentation of Apprise's Ntfy plugin tells us:

Variable Required Description
attach No Specify a web URL pointing at a remote attachment you would like the post to reference.
filename No This is only used if the attach was also provided. The ntfy server is smart enough to determine what the filename is automatically from this; however if you wish to provide a custom-override to this value, this is where you do it from.

-- https://github.com/caronc/apprise/wiki/Notify_ntfy#parameter-breakdown

So, maybe you are already successful with a dry-run snippet like this, using ?attach=https://httpbin.org/ip as an example?

mqttwarn --plugin=apprise \
    --config='{"baseuri": "ntfy://user:password@ntfy.example.org/topic1/topic2?attach=https://httpbin.org/ip"}' \
    --options='{"addrs": [], "title": "Example notification", "message": "Hello world"}'

If you can validate that any of those options to use the attach or filename parameters will work and fit your needs, we can explore how to propagate those parameters appropriately. I may think you want to use some field from the ingress message for that purpose?

With kind regards, Andreas.

amotl commented 1 year ago

we can explore how to propagate those parameters appropriately. I may think you want to use some field from the ingress message for that purpose?

Ah, you actually don't want to forward an external resource as attachment, but use the return value of what you produce in frigate_events() instead, after serializing the dictionary into JSON again?

Or do you want to use frigate_events() as an alldata transformer, as usual, and add the full ingress payload additionally as an attachment?

sevmonster commented 1 year ago

The image is sent in another MQTT payload, so it first needs to be ingressed by the alldata function. I have not gotten around to that yet. And yes, I was previously under the impression Apprise could take raw data attachments and pass them to a plugin, but upon further reading I do not believe that is possible. I am not sure if passing a file:// URI will open the file and read it, or just send the data. But I can get it to work regardless of either case, and I will also have to save the image to disk locally regardless of either case.

I am confident I can get that to work, and I have tested attachments via URI before (only with external resources fetched with HTTP however) so my main concern now is why the filter function isn't working, unless you believe there is some work that could be done to streamline the attachment process that should be focused on.

amotl commented 1 year ago

The image is sent in another MQTT payload, so it first needs to be ingressed by the alldata function.

Oh I see. So you will have to introduce a kind of stateful processing for that? Will be fun!

Apprise could take raw data attachments and pass them to a plugin, but upon further reading I do not believe that is possible.

I also think it is not possible. Maybe @caronc has different things to say about this?

I am not sure if passing a file:// URI will open the file and read it, or just send the data. But I can get it to work regardless of either case, and I will also have to save the image to disk locally regardless of either case.

Yeah, it will probably need some trial-and-error. Please let me know if you find any blockers.

Do you believe there is some work that could be done to streamline the attachment process?

My thoughts would have been around how to properly propagate an "attachment" parameter from the inbound MQTT message payload or topic into the transformation dict and then into the outbound parameters to the Apprise/Ntfy plugin. But after learning you want to submit the whole inbound message itself as an attachment, I don't think this was actually your need, right?

My main concern now is why the filter function isn't working.

All right, let's focus on this detail now.

sevmonster commented 1 year ago

We do indeed have a problem with the first part that I was hoping would be easy :) #634

The plan was to save the snapshot to file and provide a proxied URL (or file:// URI if it works) to Apprise, which Ntfy would then save to disk in its attachment cache. Like I said I was very confident that would work, but it seems I was overzealous.

amotl commented 1 year ago

I am just now getting a bit more into the details while I am looking into the snippets you provided. I will add them to the examples directory if you don't mind, so there will be a canonical reference to drive this forward.

I was hoping would be easy, but I am running into GH-634. The plan was to save the snapshot to file and provide a proxied URL (or file:// URI if it works) to Apprise, which Ntfy would then save to disk in its attachment cache.

That's sad. I was hoping that Frigate would provide an URL to the snapshot file within the "event" message already. But I see that Frigate itself has no capability to store those snapshot files anywhere, so it has no other chance than just submitting their payloads per MQTT, right? [^1]

We had two other discussions about submitting images in the past, but I think your scenario is new.

We do indeed have a problem with the first part.

Don't fret. Unless nothing serious will block this endeavour, I think we can make it work.

[^1]: Does Frigate offer any other options in this regard, like uploading to an S3-compatible object store and then publishing a corresponding event message to MQTT?

amotl commented 1 year ago

My main concern now is why the filter function isn't working.

All right, let's focus on this detail now.

While getting the code snippet to my machine, my editor's code highlighting immediately reveals why it is probably not working: Line 34 will always assign message = None. You probably want to say except there, instead of finally.

image

sevmonster commented 1 year ago

While getting the code snippet to my machine, the editor immediately reveals why it is probably not working: message will always be undefined, because line 34 will always assign None

Oops, that should be else not finally. But even in its current incarnation, should it not be stopping all messages since it always returns False? Why are messages getting through? Am I misunderstanding the filter function usage?

I did individually test the logic in the conditional with dummy data so the rest of it should be sound and do what I want, with the understanding that True is a passing filter and False fails.

I will add them to the examples directory if you don't mind, so there will be a canonical reference to drive this forward.

Feel free, but you may want to wait until everything works! Though, when the message comes in and matches all the criteria, the alldata does provide good output: Screenshot_20230412-084207

That's sad. I was hoping that Frigate would provide an URL to the snapshot file within the "event" message already. But I see that Frigate itself has no capability to store those snapshot files anywhere, so it has no other chance than just submitting them per MQTT?

I could just pull it from Frigate's files (all snapahots are stored in files, but the filenames are esoteric and I have not verified if the event message has enough data to compose the path) but I thought it might be easier to just rely fully on MQTT. I, too, was hoping it pushed events based on detected object ID and not the label, since there's a chance for a race condition with how I am approaching it now, but here we are.

Does Frigate offer any other options in this regard, like uploading to an S3-compatible object store and then publishing a corresponding event message to MQTT?

Frigate can optionally save snapshots to file, but that is a separate mechanism from the snapshot publishing to MQTT. I don't know if it offers object storage but I will lean towards no. (I wouldn't use it anyway.)

amotl commented 1 year ago

I will your code snippets to the examples directory if you don't mind, so there will be a canonical reference to drive this forward.

Feel free, but you may want to wait until everything works!

Sure. I will just add it to a branch first, so that you can use and test mqttwarn from there, including any improvements we will make while we go.

I thought it might be easier to just rely fully on MQTT.

Sure!

Frigate can optionally save snapshots to file, but that is a separate mechanism from the snapshot publishing to MQTT. I don't know if it offers object storage but I will lean towards no. (I wouldn't use it anyway.)

I wouldn't advise on using S3 at AWS, if this is what your concerns would be about. I was looking at MinIO here instead, which offers an S3-compatible interface. But if there is no MQTT message which will point out the URL, correlated with the event you are looking at, this is pointless.

Those are some corresponding items on the issue tracker of Frigate. It looks like all of them stalled, I am just referencing them for the sake of completeness, before moving on.

sevmonster commented 1 year ago

I wouldn't advise on using S3 at AWS, if this is what your concerns would be about.

Yep.

But if there is no MQTT message which will point out the URL, correlated with the event you are looking at, this is pointless.

I think the path can be constructed based on the payload. Frigate saves snapshots (previously clips) to clips/<cam>-<objectid>[-clean].<ext>. Since I save snapshots for all events, it should be enough to create the URI pointing to where that file will be/is. I will need to investigate if the file is written async or if it is written after the event is triggered, to see if I need to kludge a race condition or not.

amotl commented 1 year ago

I think the path can be constructed based on the payload.

That would be excellent, so mqttwarn would not need to process the image at all? So, the cam and objectid parameters could be used from the JSON message payload, i.e. ...?

  "after": {
    "id": "1680791459.255384-abcdef",
    "camera": "camera",
}

In this way, it would be a matter to find out how those parameters could be interpolated into the configured Apprise URL, right?

amotl commented 1 year ago

I've just added the code and configuration snippets you provided with GH-635. When running the example as outlined at README » Usage, I am getting a huge error output. Those are the first few lines with significant content.

2023-04-12 19:26:08,185 DEBUG    [mqttwarn.core             ] Message received on frigate/events: {
2023-04-12 19:26:08,185 DEBUG    [mqttwarn.core             ] Section [frigate/events] matches message on frigate/events, processing it
2023-04-12 19:26:08,185 DEBUG    [mqttwarn.context          ] filter ******************************************
2023-04-12 19:26:08,186 WARNING  [mqttwarn.context          ] Cannot invoke filter function 'frigate_events_filter' defined in 'frigate/events': Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
2023-04-12 19:26:08,186 DEBUG    [mqttwarn.context          ] events ******************************************
2023-04-12 19:26:08,186 WARNING  [mqttwarn.context          ] Cannot invoke alldata function 'frigate_events' defined in 'frigate/events': Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
2023-04-12 19:26:08,186 DEBUG    [mqttwarn.core             ] Cannot decode JSON object, payload={: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
[...]
2023-04-12 19:18:30,233 ERROR    [mqttwarn.core             ] Formatting message with function '{title}' failed
Traceback (most recent call last):
  File "/path/to/mqttwarn/mqttwarn/core.py", line 357, in xform
    res = Formatter().format(function, **transform_data)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[...]
KeyError: 'title'
amotl commented 1 year ago

Oh I see. Such errors are apparently still hard to debug. It will be easier when tweaking mqttwarn.context a bit.

diff --git a/mqttwarn/context.py b/mqttwarn/context.py
index 1c1508e..ff1fa7f 100644
--- a/mqttwarn/context.py
+++ b/mqttwarn/context.py
@@ -62,6 +62,7 @@ class RuntimeContext:
             try:
                 return self.invoker.filter(filterfunc, topic, payload, section)
             except Exception as e:
+                raise
                 logger.warning("Cannot invoke filter function '%s' defined in '%s': %s" % (filterfunc, section, e))
         return False

It gives us the following output then.

2023-04-12 19:32:03,074 DEBUG    [mqttwarn.core             ] Message received on frigate/events: {
2023-04-12 19:32:03,075 DEBUG    [mqttwarn.core             ] Section [frigate/events] matches message on frigate/events, processing it
2023-04-12 19:32:03,075 DEBUG    [mqttwarn.context          ] filter ******************************************
Traceback (most recent call last):
  File "/path/to/mqttwarn/mqttwarn/core.py", line 643, in subscribe_forever
    mqttc.loop_forever()
[...]
  File "/path/to/mqttwarn/mqttwarn/context.py", line 63, in is_filtered
    return self.invoker.filter(filterfunc, topic, payload, section)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/mqttwarn/mqttwarn/context.py", line 204, in filter
    rc = func(topic, payload, section, self.srv)  # new version
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "frigate.py", line 32, in frigate_events_filter
[...]
  File "/path/to/mqttwarn/.venv311/lib/python3.11/site-packages/paho/mqtt/client.py", line 3570, in _handle_on_message
    on_message(self, self._userdata, message)
  File "/path/to/mqttwarn/mqttwarn/core.py", line 183, in on_message
    if context.is_filtered(section, topic, payload):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/mqttwarn/mqttwarn/context.py", line 63, in is_filtered
    return self.invoker.filter(filterfunc, topic, payload, section)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/mqttwarn/mqttwarn/context.py", line 204, in filter
    rc = func(topic, payload, section, self.srv)  # new version
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "frigate.py", line 32, in frigate_events_filter
    message = json.loads(message)
              ^^^^^^^^^^^^^^^^^^^

So, apparently, assuming message is an undecoded payload is wrong here?

amotl commented 1 year ago

Oh, nevermind, it's my fault. cat frigate-event.json | mosquitto_pub -t 'frigate/events' -l will not publish the message correctly, as it will publish individual lines of that file. Apologies for the noise.

Still, it reveals a spot where developing custom mqttwarn logic is painful, because the root cause is hidden by suppressing the corresponding full stack trace. We should think about improving this situation.

sevmonster commented 1 year ago

Yes, I also ran into this kind of issue when I was attempting to write the custom functionality. I had to enable the very verbose and hard to read debug messages to even get an inkling of what was wrong, and had to run my code in a standalone interpreter to catch syntax errors. I also feel the documentation for writing Python functions could be improved as I found it hard to read and navigate, though that's another issue.

I don't currently get any errors from running the alldata or filter functions when the event is fired properly, so your guess is as good as mine as to why it doesn't work.

sevmonster commented 1 year ago

I think the path can be constructed based on the payload.

That would be excellent, so mqttwarn would not need to process the image at all? So, the cam and objectid parameters could be used from the JSON message payload, i.e. ...?

  "after": {
    "id": "1680791459.255384-abcdef",
    "camera": "camera",
}

In this way, it would be a matter to find out how those parameters could be interpolated into the configured Apprise URL, right?

That is correct. But if the image is not written synchronously, it could be possible the image does not exist when the MQTT event is sent. I have not looked into this yet. As long as the image is written before the MQTT event is, there should be no problem approaching it in this way. However, this does require that mqttwarn and Frigate be on the same system and/or mqttwarn have immediate access to Frigate's snapshots.

Alternatively, the image can be hosted on Frigate behind a proxy, and a URL to that can be provided to Ntfy. Ntfy will download the image and store it in its attachment cache. This is less secure as there can be no authentication for the URL for Ntfy to download it. You could make the file path to contain a shared secret or algorithm between Frigate's reverse proxy and mqttwarn to help mitigate this. Or, you could block unexpected requests at the firewall level.

amotl commented 1 year ago

I've fixed the example with de00c85d85. Now, it seems to process the event perfectly well.

2023-04-12 20:10:56,576 DEBUG    [mqttwarn.core             ] Message received on frigate/events: {"before":{"id":"1680791459.255384-abcdef","camera":"camera","frame_time":1680791459.255384,"snapshot_time":0,"label":"car","sub_label":null,"top_score":0,"false_positive":true,"start_time":1680791459.255384,"end_time":null,"score":0.7,"box":[0,20,0,20],"area":400,"ratio":1,"region":[0,0,320,320],"stationary":false,"motionless_count":0,"position_changes":0,"current_zones":[],"entered_zones":[],"has_clip":false,"has_snapshot":false},"after":{"id":"1680791459.255384-abcdef","camera":"camera","frame_time":1680791506.638857,"snapshot_time":1680791506.638857,"label":"car","sub_label":null,"top_score":0.75,"false_positive":false,"start_time":1680791459.255384,"end_time":null,"score":0.8,"box":[1,21,1,21],"area":400,"ratio":1,"region":[0,0,320,320],"stationary":false,"motionless_count":1,"position_changes":2,"current_zones":[],"entered_zones":["zone1"],"has_clip":true,"has_snapshot":true},"type":"new"}
2023-04-12 20:10:56,576 DEBUG    [mqttwarn.core             ] Section [frigate/events] matches message on frigate/events, processing it
2023-04-12 20:10:56,576 DEBUG    [mqttwarn.context          ] filter ******************************************
2023-04-12 20:10:56,577 DEBUG    [mqttwarn.context          ] events ******************************************
2023-04-12 20:10:56,577 DEBUG    [mqttwarn.core             ] Message on frigate/events going to apprise-ntfy
2023-04-12 20:10:56,577 DEBUG    [mqttwarn.core             ] New `apprise-ntfy:None' job: frigate/events
2023-04-12 20:10:56,577 DEBUG    [mqttwarn.core             ] Processor #0 is handling: `apprise-ntfy' for None
2023-04-12 20:10:56,578 INFO     [mqttwarn.core             ] Invoking service plugin for `apprise-ntfy'
2023-04-12 20:10:56,578 DEBUG    [mqttwarn.services.apprise-ntfy] *** MODULE=/path/to/mqttwarn/mqttwarn/services/apprise_single.py: service=apprise-ntfy, target=None
2023-04-12 20:10:56,578 DEBUG    [mqttwarn.services.apprise-ntfy] Sending notification to Apprise. target=None, addresses=[]
2023-04-12 20:10:56,583 WARNING  [apprise                   ] Unsupported format specified In+zones++at+2023-04-06+16:31:46.638857
2023-04-12 20:10:56,583 DEBUG    [apprise                   ] Loaded ntfy URL: ntfys://ntfy/frigate?priority=default&mode=private&click=https%3A%2F%2Ffrigate%2Fevents%3Fcamera%3Dcamera%26label%3Dcar%26zone%3Dzone1&format=text&overflow=upstream&rto=4.0&cto=4.0&verify=yes
2023-04-12 20:10:56,583 DEBUG    [apprise                   ] ntfy POST URL: https://ntfy (cert_verify=True)
2023-04-12 20:10:56,584 DEBUG    [apprise                   ] ntfy Payload: {'topic': 'frigate', 'title': 'car entered zone1', 'message': 'In zones  at 2023-04-06 16:31:46.638857'}
2023-04-12 20:10:56,584 DEBUG    [apprise                   ] ntfy Headers: {'User-Agent': 'Apprise', 'Content-Type': 'application/json', 'X-Click': 'https://frigate/events?camera=camera&label=car&zone=zone1'}
2023-04-12 20:10:56,596 DEBUG    [urllib3.connectionpool    ] Starting new HTTPS connection (1): ntfy:443
2023-04-12 20:10:56,601 WARNING  [apprise                   ] A Connection error occurred sending ntfy:https://ntfy notification.
2023-04-12 20:10:56,601 DEBUG    [apprise                   ] Socket Exception: HTTPSConnectionPool(host='ntfy', port=443): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x1126644d0>: Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known'))
2023-04-12 20:10:56,601 DEBUG    [asyncio                   ] Using selector: KqueueSelector
2023-04-12 20:10:56,601 INFO     [apprise                   ] Notifying 1 service(s) asynchronously.
2023-04-12 20:10:56,602 ERROR    [mqttwarn.services.apprise-ntfy] Sending message using Apprise failed
2023-04-12 20:10:56,602 WARNING  [mqttwarn.core             ] Notification of apprise-ntfy for `frigate/events' FAILED or TIMED OUT
2023-04-12 20:10:56,602 DEBUG    [mqttwarn.core             ] Job queue has 0 items to process

I don't currently get any errors from running the alldata or filter functions when the event is fired properly, so your guess is as good as mine as to why it doesn't work.

Can you specifically outline what does not work for you? Should the frigate-event.json actually fire an event, or not? In either case, it will be good if you can provide full examples of both variants, just to make the example complete.

sevmonster commented 1 year ago

You can do any of the following:

sevmonster commented 1 year ago

That is correct. But if the image is not written synchronously, it could be possible the image does not exist when the MQTT event is sent. I have not looked into this yet. As long as the image is written before the MQTT event is, there should be no problem approaching it in this way. However, this does require that mqttwarn and Frigate be on the same system and/or mqttwarn have immediate access to Frigate's snapshots.

Alternatively, the image can be hosted on Frigate behind a proxy, and a URL to that can be provided to Ntfy. Ntfy will download the image and store it in its attachment cache. This is less secure as there can be no authentication for the URL for Ntfy to download it. You could make the file path to contain a shared secret or algorithm between Frigate's reverse proxy and mqttwarn to help mitigate this. Or, you could block unexpected requests at the firewall level.

Regrettably, I don't think this will work. Frigate only writes thumbnail to disk after the detection has ended. So I will have to use the initial solution of relying on the snapshots endpoint.

I glanced at Frigate to try and figure out how they publish MQTT events, but I have so far not really been able to demystify it. The thumbnail key is removed from the MQTT payload but it exists on the object, I haven't seen how that works or if there is a way to just include it with the payload, which would make all of this much easier...

amotl commented 1 year ago

You can do any of the following [to create an event message which should be ignored] [...]

Thanks. With f35ccd16022, I've added corresponding minified example messages, and adjusted the frigate_events_filter function to make it work like I think you intended it.

The implementation became slightly more verbose compared to what you have been attempting with the list comprehension, but I think it is a good thing, in order to provide better log messages to the dear user, which - just in case - is nothing less as an audit log. In this particular use case, it may become important to exactly know why your event notification has been ignored or skipped.

amotl commented 1 year ago

Regrettably, I don't think this will work. Frigate only writes thumbnail to disk. So I will have to use the initial solution of relying on the snapshots endpoint.

Thanks for investigating. So be it. I will add corresponding code to the collab/frigate branch, on behalf of GH-635, if it's possible to tweak mqttwarn accordingly.

sevmonster commented 1 year ago

I saw in your latest commits with the docstring on the filter function, that True means that the object should be filtered... I think that's where my confusion was coming from, logic dictates to me that False means to not include the object while True means to include it. And since I was setting message to None unconditionally, that meant everything was getting displayed.

Oops.

amotl commented 1 year ago

[Regarding the image snaphost transport,] so be it. I will add corresponding code, attempting to...

May I ask in which order the messages are published to the broker? Does the event message come first, and the image snapshot message afterwards?

sevmonster commented 1 year ago

[Regarding the image snaphost transport,] so be it. I will add corresponding code, attempting to...

May I ask in which order the messages are published to the broker? Does the event message come first, and the image snapshot message afterwards?

That's what I was trying to figure out, I don't know yet. Based on what I am reading the snapshot is published when a detection moves to a true positive state, and from every event there on if the snapshot is considered better than the previous one. I am not sure which gets published first, I can observe to see.

Here's my new filter function.

def frigate_events_filter(topic, message, section, srv=None):
    try:
        message = json.loads(message)
    except:
        pass
    else:
        # can't parse the message
        return True

    # only process messages that match schema and have zone changes
    if (message.get('type', None) != 'end'
            and (a := message.get('after', None))
            and a.get('current_zones', None) != a.get('entered_zones', None)):
        return False in (
          x in a and (x == 'current_zones' or bool(a[x])) for x in
            ('false_positive', 'camera', 'label',
             'current_zones', 'entered_zones', 'frame_time'))

    return False
amotl commented 1 year ago

Does the event message come first, and the image snapshot message afterwards?

That's what I was trying to figure out

Oh, it's really easy to find out. Just subscribe to the MQTT broker on all topics.

mosquitto_sub -v -h localhost -t '#'
sevmonster commented 1 year ago

Oh, it's really easy to find out. Just subscribe to the MQTT broker on all topics.

Right, I just haven't done it yet :)

amotl commented 1 year ago

Here's my new filter function.

Thanks for sharing. I will add it to the feature branch. However, currently it skips both frigate-event-new.json and frigate-event-full.json samples. I think it will be really better to follow the more verbose style, and emit appropriate log messages on each relevant skip occasion.

I really can't decipher the logic from the code, is your comment »only process messages that match schema and have zone changes« a thorough description about what you are trying to achieve?

I think the first part is fine

    if (message.get('type', None) != 'end'
            and (a := message.get('after', None))
            and a.get('current_zones', None) != a.get('entered_zones', None)):

but please let's provide a more "unfolded" variant of the second part to readers of the tutorial I am using your use-case for.

          x in a and (x == 'current_zones' or bool(a[x])) for x in
            ('false_positive', 'camera', 'label',
             'current_zones', 'entered_zones', 'frame_time'))
amotl commented 1 year ago

However, currently it skips both frigate-event-new.json and frigate-event-full.json samples.

This code will always return True.

    try:
        message = json.loads(message)
    except:
        pass
    else:
        # can't parse the message
        return True

This code will be better, but still it will not inform the user by emitting a corresponding log message.

    try:
        message = json.loads(message)
    except:
        # can't parse the message
        return True

However, after fixing this, "good" messages will still be skipped. I've validated that another flaw must be in the list comprehension comparison function. Please unfold it ;].

sevmonster commented 1 year ago

I am doing a few things at once so my code could have been better. Here is it commentated and with your suggestions. Both this and my previous post have been untested (hence why it filtered new messages as I did not intend), but I paid more attention this time, and the below should work.

    try:
        message = json.loads(message)
    except ValueError as e:
        srv.logging.warning(f"Can't parse Frigate event message: {e}")
        return True

    # check the message type
    t = message.get('type', None)
    if t is None:
        srv.logging.warning('Frigate event: Missing message type')
        return True
    # ignore ending messages
    elif t == 'end':
        return True

    # payload must have 'after' key
    elif (a := message.get('after', None)) is None:
        srv.logging.warning("Frigate event: 'after' missing from payload")
        return True

    # validate the 'after' dict contains the values we need
    for x in ('false_positive', 'camera', 'label',
              'current_zones', 'entered_zones', 'frame_time'):
        # we can ignore if current_zones is empty, but all other keys should
        # be present and have values
        if x not in a or (x != 'current_zones' and not a[x]):
            srv.logging.warning(f"Frigate event: 'after.{x}' missing from payload or empty")
            return True

    # if this is an update message, ignore it if zones haven't changed
    # updates can happen for other reasons that we don't need to notify
    if t == 'update' and a['current_zones'] == a['entered_zones']:
        return True

    return False

This should have the same functionality as

    if (message.get('type', None) != 'end'
            and (a := message.get('after', None))
            and (messages.get('type', None) == 'new'
                 or a.get('current_zones', None) != a.get('entered_zones', None)):
        return False in (
          x in a and (x == 'current_zones' or bool(a[x])) for x in
            ('false_positive', 'camera', 'label',
             'current_zones', 'entered_zones', 'frame_time'))

When I get home later tonight I will test all of this to make sure it does what I think it should, but I am fairly confident now.

amotl commented 1 year ago

When I get home later tonight I will test all of this.

You are not actually running the code? This makes sense now ;].

The code will not work because not a[x] will work inversely on the value of the false_positive field to what you intended. You want to skip if it's True, right?

Let's please serialize validation rule evaluation, handle each field type individually, and provide good error messaging to the user and developer, so please build upon 58f716d587652 when sending further code snippets [^1].

The implementation now succeeds to let frigate-event-new.json and frigate-event-full.json pass, and skips frigate-event-end.json as well as frigate-event-false-positive.json, so I will go back at looking into GH-634 again.

[^1]: And please also only do after actually running it ;]. Saying that, I am very thankful for outlining the problem space to me, and that we have been able to iterate on this together.

amotl commented 1 year ago

Still, it reveals a spot where developing custom mqttwarn logic is painful, because the root cause is hidden by suppressing the corresponding full stack trace. We should think about improving this situation.

-- https://github.com/jpmens/mqttwarn/issues/632#issuecomment-1505681952

Yes, I also ran into this kind of issue when I was attempting to write the custom functionality. I had to enable the very verbose and hard to read debug messages to even get an inkling of what was wrong, and had to run my code in a standalone interpreter to catch syntax errors. I also feel the documentation for writing Python functions could be improved as I found it hard to read and navigate, though that's another issue.

-- https://github.com/jpmens/mqttwarn/issues/632#issuecomment-1505690686

c258840a2d7 tries to improve the situation in this regard.

amotl commented 1 year ago

Hi again,

I've unlocked receiving and processing MQTT messages with binary payloads, see https://github.com/jpmens/mqttwarn/issues/634#issuecomment-1506143409, and also unlocked configuring a filename_template setting on the apprise-ntfy service of your example with 63dda29b, in order to populate the attach parameter to Apprise/Ntfy. It is currently configured like filename_template = './var/media/{camera}-{label}.jpg', effectively interpolating the camera and label fields from the outcome of the transformation process within the frigate_events() alldata callback.

It may need some more adjustments, but it seems like the parameter propagation works up to the point where it actually notifies Apprise/Ntfy. There, it fails because I don't have a Ntfy daemon configured on my machine. I will certainly do, in order to complete the README accordingly, and maybe add a dedicated test case for the whole scenario, but maybe I can't find more time for that, so you may also step in to test the current implementation end-to-end?

With kind regards, Andreas.

sevmonster commented 1 year ago

Wow, you've gone much farther with this than I ever considered. I was perfectly content with my rinky-dink function (that I neglected to test and was filled with issues...), but this implementation is much more solid and effective. I have nothing to add to your class structure or control flow, it all looks good.

I left some feedback on the PR. You have also reminded me that a second argument of None to dict.get is not required since it defaults to None. (Has it always been like that? I thought it raised KeyError.)

sevmonster commented 1 year ago

Screenshot_20230413-210656 We have success. I had to make the changes I suggested in the PR to get the attachment to go through and prevent repeated notifications. But otherwise this is great.

I anticipate the attachment solution will not be rock solid as there is still a potential race condition if Frigate updates the snapshots repeatedly and quickly. But it works for now. A better solution would be to capture the picture as part of the payload via thumbnail, but that is not currently sent over MQTT. I plan to look into that at a later date when I have the time.

amotl commented 1 year ago

Hi again,

excellent that it works for you. We will bring in a few more improvements with GH-635, as discussed.

Independently of that, while we are currently enjoying a good collaboration, I would like to ask for your feedback about the documentation modernizations happening on behalf of GH-389 and GH-636, see https://mqttwarn.readthedocs.io/.

With kind regards, Andreas.

amotl commented 1 year ago

Hi again,

I've improved the corresponding implementations needed to support your use case, and I will be happy to receive any kind of feedback.

With kind regards, Andreas.

References

amotl commented 1 year ago

Dear @sevmonster,

GH-638 has been merged, and now there is also much progress with GH-639. Specifically, I've just added 2433cd461c, which synchronizes JSON event and snapshot image receive order, so that things can be controlled precisely.

Together with a few other improvements, for example that the configuration of text message templates has been pulled into the configuration file, the configuration now looks like this [^1]:

[config:ntfy]
targets = {
    'test': {
        'url': 'http://username:password@localhost:5555/frigate-testdrive',
        'file': '/tmp/mqttwarn-frigate-{camera}-{label}.png',
        'click': 'https://httpbin.org/anything?camera={event.camera}&label={event.label}&zone={event.entered_zones[0]}',
        # Wait for the file to arrive for three quarters of a second, and delete it after reading.
        '__settings__': {
            'file_retry_tries': 10,
            'file_retry_interval': 0.075,
            'file_unlink': True,
            }
        }
    }

[frigate/events]
filter  = frigate_events_filter()
alldata = frigate_events()
targets = ntfy:test
title   = {event.label} entered {event.entered_zones_str} at {event.time}
format  = {event.label} was in {event.current_zones_str} before

# Limit the alert based on camera/zone.
frigate_skip_rules = {
    'rule-1': {'camera': ['frontyard'], 'entered_zones': ['lawn']},
    }

With kind regards, Andreas.

[^1]: I've omitted the bottom half, you can inspect the full configuration file at frigate.ini.

amotl commented 1 year ago

mqttwarn 0.34.0 has been released, Frigate » Forward events and snapshots to ntfy has the corresponding documentation. I will be happy to hear back from you, about if you can validate it also works well on your end, or if we would need to add a few more adjustments.

When everything works well, we could add corresponding stubs to the integration pages on both the Frigate, and the ntfy documentation ^1.

amotl commented 1 year ago

In order to shed more light onto the current implementation, and to set the stage for subsequent actions on it, please consider those two fragments of the mqttwarn configuration file for the Frigate integration scenario, frigate.ini.

I was expecting that there would be the need to introduce concurrency and synchronization into the user-defined functions file frigate.py, in order to properly handle the two distinct MQTT messages emitted by Frigate, and process them properly, but it turned out that mqttwarn has all the machinery in place to make that happen without further ado.

Specifically, it is the num_workers setting [^1].

https://github.com/jpmens/mqttwarn/blob/b6ab307ceb0d07c011288f5edfaf8a16034df4ed/examples/frigate/frigate.ini#L10-L13

The other detail is the file option within the target address descriptor for attaching files to outbound ntfy notifications, and its newly introduced accompanying __settings__ properties [^2].

https://github.com/jpmens/mqttwarn/blob/b6ab307ceb0d07c011288f5edfaf8a16034df4ed/examples/frigate/frigate.ini#L24-L37

[^1]: The num_workers setting, made configurable with GH-192 the other day, can usually be used to increase the throughput capacity of mqttwarn, for example when needing to process a high number of events. However, in this case, it found another excellent application related to your use case which needed concurrent processing, by starting two concurrent tasks, where the one started first would have to wait for the other to complete - with an additional property that it should not block/wait forever. I think both use cases should be documented well - currently there is nothing about the num_workers setting in the documentation at all, and I think it's really powerful.

[^2]: By the way, while used exclusively within the ntfy service plugin up until now (see ntfy.py#L167-L172), the feature to gracefully wait for a file to appear on the filesystem for a specified amount of time, and optionally delete it afterwards, is now generally available as a utility function mqttwarn.util.load_file(), so it can be leveraged in similar scenarios as well. I think it should also be documented well.

sevmonster commented 1 year ago

Sorry, I have been away from this for a while. I will get read up on all the amazing work you all have been doing. From what I am seeing there is now a ntfy notifier which can send directly to ntfy server—that is amazing and I can't wait to try it out.