MetPX / sarracenia

https://MetPX.github.io/sarracenia
GNU General Public License v2.0
46 stars 22 forks source link

Possible feature request: additional fields in accept statements that can be used by plugins #1289

Open reidsunderland opened 2 weeks ago

reidsunderland commented 2 weeks ago

I've run into a couple of cases where it would be very helpful to let a user define additional fields to the right of accept statements, similar to how we have DESTFN now.

For example, I have this config:

accept      .*${HOSTNAME}.*/loggernet/.*T_Out.*.dat
accept      .*${HOSTNAME}.*/loggernet/.*T_TBRG.*.dat
accept      .*${HOSTNAME}.*/loggernet/.*T_RF1.*.dat
accept      .*${HOSTNAME}.*/loggernet/.*T_StnConfig.*.dat
accept      .*${HOSTNAME}.*/loggernet/.*final_storage_.*.dat

rawca2b_map loggernet/||CACN65 CWAO||(...)_T_Out.*.dat|(...)_T_TBRG.*.dat|(...)_T_RF1.*.dat|(...)_T_StnConfig.*.dat|(...)_final_storage_.*.dat

The fields are separated by ||. The first field is a string that is mostly redundant with the accept statements. The second field is the bulletin header that we want. The third field is a regex that identifies where the 3 character station ID is in the filename. It would be more user friendly (for both the plugin developer and config file author) to integrate this with the accept statements.

Like this:

accept      .*${HOSTNAME}.*/loggernet/.*(...)_T_Out.*.dat            header=CACN65 CWAO
accept      .*${HOSTNAME}.*/loggernet/.*(...)_T_TBRG.*.dat           header=CACN65 CWAO
accept      .*${HOSTNAME}.*/loggernet/.*(...)_T_RF1.*.dat            header=CACN65 CWAO
accept      .*${HOSTNAME}.*/loggernet/.*(...)_T_StnConfig.*.dat      header=CACN65 CWAO
accept      .*${HOSTNAME}.*/loggernet/.*(...)_final_storage_.*.dat   header=CACN65 CWAO

I'm not sure how hard this would be to implement, and it's not a high priority. I'm just submitting this issue because it might be something worth looking into someday.

petersilva commented 2 weeks ago

Header is confusing in this context... the fields of a message are called "headers" like when someone wants to post a message with a custom header, they use: --header <key>=<value> ... but I think you mean the key is 'header' and something is putting a field in the message called header whose value is the WMO AHL (Abbreviated Header Line) of the message.

if someone were posting the message it would be...


sr3_cpost ... --header header="CACN65 CWAO"

The idea is to be able to match regexes against the values of specified fields in the message?

petersilva commented 2 weeks ago

we could add modifier... "matching" default would be like so:


accept <pattern> matching=${baseUrl}/${relPath} ....

but if you override default of matching? you can put: matching={someFieldName}

petersilva commented 2 weeks ago

or maybe just in ... accept pattern in= ...

petersilva commented 2 weeks ago

But... yeah... I was thinking the accept has to match some different field... but that isn't what you are saying... you are saying the accept must match the filename pattern AND have the specified header value? or are you using || as or? so it needs to match the regex or have the specified header?

reidsunderland commented 2 weeks ago

Yes, header is a bad example. I don't want to change anything about how accepts behave in sr3. I just want a mechanism that allows us to define extra stuff that a plugin can use on a per-accept basis.

Assuming the thing on the right of an accept statement's regex is not a Sundew filename option, sr3 shouldn't understand or care what it is. I just want a way of passing that stuff to a plugin, and the plugin can do whatever it wants with it.

So replace header in my example with AHL, and it's up to the plugin to decide what to do with the AHL.

accept      .*${HOSTNAME}.*/loggernet/.*(...)_T_Out.*.dat            AHL=CACN65 CWAO
accept      .*${HOSTNAME}.*/loggernet/.*(...)_T_TBRG.*.dat           AHL=CACN65 CWAO
accept      .*${HOSTNAME}.*/loggernet/.*(...)_T_RF1.*.dat            AHL=CACN65 CWAO
accept      .*${HOSTNAME}.*/loggernet/.*(...)_T_StnConfig.*.dat      AHL=CACN65 CWAO
accept      .*${HOSTNAME}.*/loggernet/.*(...)_final_storage_.*.dat   AHL=CACN65 CWAO

In that specific example, the plugin already has access to the msg['_matches'] which contains the regex that caused the file to be accepted. So I want to be able to do something like this in the plugin:
new_file = f"{msg['stuff']['AHL']}__{msg['_matches'].group(1)}. And that plugin would produce a filename like CACN65 CWAO)__ABC from an input relPath .../loggernet/Data_ABC_T_Out_123231535342.dat


The other example is the Iridium sender that @tysonkaufmann worked on.

We currently have a config kind of like this:

iridium_key FQCN03_CWAO METWARN some_code_1
iridium_key FICN03_CWIS METWARN some_code_2

iridium_key FQCN04_CWAO METWARN some_code_3
iridium_key FICN04_CWIS METWARN some_code_4

on_msg iridium_plugin.py

mirror False

base_dir /apps/sarra/public_data

destination sftp://....
directory /

accept .*WMO-BULLETINS/FI/CWIS/.*/.*CWIS.*ice-am.*
accept .*MSC-BULLETINS/FQ/CWAO/.*/.*CWAO.*Issued.*

The plugin renames FQCN03_CWAO files to some_code_1 and so on. But there's redundancy between the accept statements and iridium_keys.

I think it could be cleaner to have a config like this:

accept .*MSC-BULLETINS/FQ/CWAO/.*/FQCN03_CWAO.*Issued.*   METWARN some_code_1
accept .*WMO-BULLETINS/FI/CWIS/.*/FICN03_CWIS.*ice-am.* METWARN some_code_2

accept .*MSC-BULLETINS/FQ/CWAO/.*/FQCN04_CWAO.*Issued.*   METWARN some_code_3
accept .*WMO-BULLETINS/FI/CWIS/.*/FICN04_CWIS.*ice-am.* METWARN some_code_4
reidsunderland commented 2 weeks ago

I made some changes and I think I got it working the way I was hoping. All the additional parts of an accept statement get stored in the mask tuple, and now a pointer to the mask tuple is included in each message, so plugins have access to the accept mask that matched the message they are working with.

https://github.com/MetPX/sarracenia/compare/development...issue1289

Just as a test, I added these accept statements to the hpfx_amis example config.

accept .*CACN.* something=anotherthing 12345 another_thing=123
accept .*SACN.* DESTFN=fdsa stuffhere

From sr3 show:

 'masks': ["accept .*CACN.* into /tmp/hpfx_amis/ with mirror:False filename:None args:['something=anotherthing', '12345', 'another_thing=123']",
           "accept .*SACN.* into /tmp/hpfx_amis/ with mirror:False filename:DESTFN=fdsa args:['stuffhere']", 'reject .* into /tmp/hpfx_amis/ with mirror:False filename:None'],

And with an after_accept plugin that just prints the message to the log, it shows the _mask field is there, and a theoretical plugin can choose to do whatever it wants with msg['_mask'][-1].

2024-11-07 21:56:04,792 [INFO] 4011309 accept_thing after_accept MESSAGE: {'_format': 'v02',
         '_deleteOnPost': {'new_subtopic', 'new_relPath',  'subtopic', 'new_baseUrl', '_matches', 'new_file','new_dir', 'topic',  '_format', 'exchange',  'local_offset','ack_id', '_mask',  'post_format'},
         'sundew_extension': 'from_ncp_sr3:CWVH:SA:3:Direct:20241107215558',
         'from_cluster': 'DDSR.CMC',
         'to_clusters': 'ALL',
         'filename': 'msg_ddsr-WXO-DD3_e4560dd2ba53ef1494e213c6995a430e:from_ncp_sr3:CWVH:SA:3:Direct:20241107215558',
         'source': 'anonymous',
         'mtime': '20241107T215559.357',
         'atime': '20241107T215559.357',
         'pubTime': '20241107T215559.357',
         'baseUrl': 'https://hpfx.collab.science.gc.ca',
         'relPath': '/20241107/WXO-DD/bulletins/alphanumeric/20241107/SA/CWVH/21/SACN64_CWVH_072200__CWVH_22403',
         'subtopic': ['20241107',
         'WXO-DD',
         'bulletins',
         'alphanumeric',
         '20241107',
         'SA',
         'CWVH',
         '21'],
         'identity': {'method': 'md5',
         'value': 'zURjP9URWmCPwJ3lIurDSw=='},
         'size': 81,
         'exchange': 'xpublic',
         'topic': 'v02.post.20241107.WXO-DD.bulletins.alphanumeric.20241107.SA.CWVH.21',
         'ack_id': {'delivery_tag': 260,
         'channel_id': 2,
         'connection_id': 'cf075757-7abf-46ad-a213-e494afd4cf55_sub',
         'broker': 'hpfx.collab.science.gc.ca:5671//'},
         'local_offset': 0,
         '_matches': <re.Match object; span=(0,123),  match='https://hpfx.collab.science.gc.ca/20241107/WXO-DD>,
-->     '_mask': ('.*SACN.*', '/tmp/hpfx_amis/', 'DESTFN=fdsa', re.compile('.*SACN.*'), True, False, 0, False, '/', ['stuffhere']),
         'new_dir': '/tmp/hpfx_amis',
         'new_file': 'fdsa',
         'post_format': 'v03',
         'new_baseUrl': 'https://hpfx.collab.science.gc.ca',
         'new_relPath': 'tmp/hpfx_amis/fdsa',
         'new_subtopic': ['tmp',
         'hpfx_amis']}
petersilva commented 2 weeks ago

oh that's cool... not what I thought, but that looks fine. The thing I would worry about is what happens when persisting (retry queues.) typically complex types are hard to serialize into JSON.

That's the only worry I would have...

petersilva commented 2 weeks ago

Also... you probably want varsub() to happen... so ${this} or ${that} gets evaluated also... (should test to see if it is or isn't already.)

reidsunderland commented 22 hours ago

I can't believe it was already 2 weeks ago when I was working on this. Variable substitution does already work 🎉

I need to test the retry queues.

accept .*CACN.* something=anotherthing 12345 another_thing=123 varsub=${BROKER_USER}

from sr3 show:

'masks': ["accept .*CACN.* into /tmp/hpfx_amis/ with mirror:False filename:None args:['something=anotherthing', '12345', 'another_thing=123', 'varsub=anonymous']",
reidsunderland commented 22 hours ago

Saving to the DiskQueue also works fine, because we're using jsonpickle.

In the diskqueue file:

"_mask": {"py/tuple": [".*CACN.*", "/tmp/hpfx_amis/", null, {"py/object": "re.Pattern", "pattern": ".*CACN.*"}, true, false, 0, false, "/", ["something=anotherthing", "12345", "another_thing=123", "varsub=anonymous"]]},

After being restored from diskqueue:

'_mask': ('.*CACN.*',
         '/tmp/hpfx_amis/',
         None,
         re.compile('.*CACN.*'),
         True,
         False,
         0,
         False,
         '/',
         ['something=anotherthing',
         '12345',
         'another_thing=123',
         'varsub=anonymous']),
petersilva commented 15 hours ago

looking good for a PR then, I guess!